Add expected database schema version to RPCs.
- Add schema version to client code with minimum disruption to
API surface.
- This cl is based on following cl
https://vanadium-review.googlesource.com/#/c/13734/
- TODO to port definiton of Schema from go to js.
MultiPart: 2/2
Change-Id: I19ac4764307d65fa5811663b802a95ea94827986
diff --git a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
index 1406445..1e5f77d 100644
--- a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
@@ -14,6 +14,7 @@
var access = require('./../../../../../v23/security/access');
var permissions = require('./../../../../../v23/services/permissions');
+var watch = require('./../../../../../v23/services/watch');
module.exports = {};
@@ -27,11 +28,8 @@
var _type5 = new vdl.Type();
var _type6 = new vdl.Type();
var _type7 = new vdl.Type();
-var _type8 = new vdl.Type();
var _typeBatchOptions = new vdl.Type();
var _typeBlobRef = new vdl.Type();
-var _typeChange = new vdl.Type();
-var _typeChangeType = new vdl.Type();
var _typeCrPolicy = new vdl.Type();
var _typeCrRule = new vdl.Type();
var _typeFetchState = new vdl.Type();
@@ -39,51 +37,40 @@
var _typeKeyValue = new vdl.Type();
var _typePrefixPermissions = new vdl.Type();
var _typeResolverType = new vdl.Type();
-var _typeResumeMarker = new vdl.Type();
var _typeSchemaMetadata = new vdl.Type();
+var _typeStoreChange = new vdl.Type();
var _typeSyncGroupMemberInfo = new vdl.Type();
var _typeSyncGroupSpec = new vdl.Type();
-var _typeTablePrefixRange = new vdl.Type();
-var _typeWatchRequest = new vdl.Type();
_type1.kind = vdl.kind.LIST;
_type1.name = "";
-_type1.elem = _typeTablePrefixRange;
-_type2.kind = vdl.kind.LIST;
+_type1.elem = vdl.types.STRING;
+_type2.kind = vdl.kind.MAP;
_type2.name = "";
-_type2.elem = vdl.types.BYTE;
+_type2.elem = _typeSyncGroupMemberInfo;
+_type2.key = vdl.types.STRING;
_type3.kind = vdl.kind.LIST;
_type3.name = "";
-_type3.elem = vdl.types.STRING;
-_type4.kind = vdl.kind.MAP;
+_type3.elem = vdl.types.BYTE;
+_type4.kind = vdl.kind.LIST;
_type4.name = "";
-_type4.elem = _typeSyncGroupMemberInfo;
-_type4.key = vdl.types.STRING;
-_type5.kind = vdl.kind.LIST;
+_type4.elem = _typeCrRule;
+_type5.kind = vdl.kind.OPTIONAL;
_type5.name = "";
-_type5.elem = _typeCrRule;
-_type6.kind = vdl.kind.OPTIONAL;
+_type5.elem = _typeSchemaMetadata;
+_type6.kind = vdl.kind.LIST;
_type6.name = "";
-_type6.elem = _typeSchemaMetadata;
+_type6.elem = vdl.types.ANY;
_type7.kind = vdl.kind.LIST;
_type7.name = "";
-_type7.elem = vdl.types.ANY;
-_type8.kind = vdl.kind.LIST;
-_type8.name = "";
-_type8.elem = _typePrefixPermissions;
+_type7.elem = _typePrefixPermissions;
_typeBatchOptions.kind = vdl.kind.STRUCT;
_typeBatchOptions.name = "v.io/syncbase/v23/services/syncbase/nosql.BatchOptions";
_typeBatchOptions.fields = [{name: "Hint", type: vdl.types.STRING}, {name: "ReadOnly", type: vdl.types.BOOL}];
_typeBlobRef.kind = vdl.kind.STRING;
_typeBlobRef.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobRef";
-_typeChange.kind = vdl.kind.STRUCT;
-_typeChange.name = "v.io/syncbase/v23/services/syncbase/nosql.Change";
-_typeChange.fields = [{name: "Table", type: vdl.types.STRING}, {name: "Row", type: vdl.types.STRING}, {name: "ChangeType", type: _typeChangeType}, {name: "Value", type: _type2}, {name: "ResumeMarker", type: _typeResumeMarker}, {name: "FromSync", type: vdl.types.BOOL}, {name: "Continued", type: vdl.types.BOOL}];
-_typeChangeType.kind = vdl.kind.ENUM;
-_typeChangeType.name = "v.io/syncbase/v23/services/syncbase/nosql.ChangeType";
-_typeChangeType.labels = ["Put", "Delete"];
_typeCrPolicy.kind = vdl.kind.STRUCT;
_typeCrPolicy.name = "v.io/syncbase/v23/services/syncbase/nosql.CrPolicy";
-_typeCrPolicy.fields = [{name: "Rules", type: _type5}];
+_typeCrPolicy.fields = [{name: "Rules", type: _type4}];
_typeCrRule.kind = vdl.kind.STRUCT;
_typeCrRule.name = "v.io/syncbase/v23/services/syncbase/nosql.CrRule";
_typeCrRule.fields = [{name: "TableName", type: vdl.types.STRING}, {name: "KeyPrefix", type: vdl.types.STRING}, {name: "Type", type: vdl.types.STRING}, {name: "Resolver", type: _typeResolverType}];
@@ -95,30 +82,25 @@
_typeFetchStatus.fields = [{name: "State", type: _typeFetchState}, {name: "Received", type: vdl.types.UINT64}, {name: "Total", type: vdl.types.UINT64}];
_typeKeyValue.kind = vdl.kind.STRUCT;
_typeKeyValue.name = "v.io/syncbase/v23/services/syncbase/nosql.KeyValue";
-_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type2}];
+_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type3}];
_typePrefixPermissions.kind = vdl.kind.STRUCT;
_typePrefixPermissions.name = "v.io/syncbase/v23/services/syncbase/nosql.PrefixPermissions";
_typePrefixPermissions.fields = [{name: "Prefix", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}];
_typeResolverType.kind = vdl.kind.ENUM;
_typeResolverType.name = "v.io/syncbase/v23/services/syncbase/nosql.ResolverType";
_typeResolverType.labels = ["LastWins", "AppResolves", "Defer"];
-_typeResumeMarker.kind = vdl.kind.STRING;
-_typeResumeMarker.name = "v.io/syncbase/v23/services/syncbase/nosql.ResumeMarker";
_typeSchemaMetadata.kind = vdl.kind.STRUCT;
_typeSchemaMetadata.name = "v.io/syncbase/v23/services/syncbase/nosql.SchemaMetadata";
-_typeSchemaMetadata.fields = [{name: "Version", type: vdl.types.INT64}, {name: "Policy", type: _typeCrPolicy}];
+_typeSchemaMetadata.fields = [{name: "Version", type: vdl.types.INT32}, {name: "Policy", type: _typeCrPolicy}];
+_typeStoreChange.kind = vdl.kind.STRUCT;
+_typeStoreChange.name = "v.io/syncbase/v23/services/syncbase/nosql.StoreChange";
+_typeStoreChange.fields = [{name: "Value", type: _type3}, {name: "FromSync", type: vdl.types.BOOL}];
_typeSyncGroupMemberInfo.kind = vdl.kind.STRUCT;
_typeSyncGroupMemberInfo.name = "v.io/syncbase/v23/services/syncbase/nosql.SyncGroupMemberInfo";
_typeSyncGroupMemberInfo.fields = [{name: "SyncPriority", type: vdl.types.BYTE}];
_typeSyncGroupSpec.kind = vdl.kind.STRUCT;
_typeSyncGroupSpec.name = "v.io/syncbase/v23/services/syncbase/nosql.SyncGroupSpec";
-_typeSyncGroupSpec.fields = [{name: "Description", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}, {name: "Prefixes", type: _type3}, {name: "MountTables", type: _type3}, {name: "IsPrivate", type: vdl.types.BOOL}];
-_typeTablePrefixRange.kind = vdl.kind.STRUCT;
-_typeTablePrefixRange.name = "v.io/syncbase/v23/services/syncbase/nosql.TablePrefixRange";
-_typeTablePrefixRange.fields = [{name: "Table", type: vdl.types.STRING}, {name: "Prefix", type: vdl.types.STRING}];
-_typeWatchRequest.kind = vdl.kind.STRUCT;
-_typeWatchRequest.name = "v.io/syncbase/v23/services/syncbase/nosql.WatchRequest";
-_typeWatchRequest.fields = [{name: "Ranges", type: _type1}, {name: "ResumeMarker", type: _typeResumeMarker}];
+_typeSyncGroupSpec.fields = [{name: "Description", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}, {name: "Prefixes", type: _type1}, {name: "MountTables", type: _type1}, {name: "IsPrivate", type: vdl.types.BOOL}];
_type1.freeze();
_type2.freeze();
_type3.freeze();
@@ -126,11 +108,8 @@
_type5.freeze();
_type6.freeze();
_type7.freeze();
-_type8.freeze();
_typeBatchOptions.freeze();
_typeBlobRef.freeze();
-_typeChange.freeze();
-_typeChangeType.freeze();
_typeCrPolicy.freeze();
_typeCrRule.freeze();
_typeFetchState.freeze();
@@ -138,19 +117,12 @@
_typeKeyValue.freeze();
_typePrefixPermissions.freeze();
_typeResolverType.freeze();
-_typeResumeMarker.freeze();
_typeSchemaMetadata.freeze();
+_typeStoreChange.freeze();
_typeSyncGroupMemberInfo.freeze();
_typeSyncGroupSpec.freeze();
-_typeTablePrefixRange.freeze();
-_typeWatchRequest.freeze();
module.exports.BatchOptions = (vdl.registry.lookupOrCreateConstructor(_typeBatchOptions));
module.exports.BlobRef = (vdl.registry.lookupOrCreateConstructor(_typeBlobRef));
-module.exports.Change = (vdl.registry.lookupOrCreateConstructor(_typeChange));
-module.exports.ChangeType = {
- PUT: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeChangeType))('Put', true), _typeChangeType),
- DELETE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeChangeType))('Delete', true), _typeChangeType),
-};
module.exports.CrPolicy = (vdl.registry.lookupOrCreateConstructor(_typeCrPolicy));
module.exports.CrRule = (vdl.registry.lookupOrCreateConstructor(_typeCrRule));
module.exports.FetchState = {
@@ -167,12 +139,10 @@
APP_RESOLVES: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeResolverType))('AppResolves', true), _typeResolverType),
DEFER: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeResolverType))('Defer', true), _typeResolverType),
};
-module.exports.ResumeMarker = (vdl.registry.lookupOrCreateConstructor(_typeResumeMarker));
module.exports.SchemaMetadata = (vdl.registry.lookupOrCreateConstructor(_typeSchemaMetadata));
+module.exports.StoreChange = (vdl.registry.lookupOrCreateConstructor(_typeStoreChange));
module.exports.SyncGroupMemberInfo = (vdl.registry.lookupOrCreateConstructor(_typeSyncGroupMemberInfo));
module.exports.SyncGroupSpec = (vdl.registry.lookupOrCreateConstructor(_typeSyncGroupSpec));
-module.exports.TablePrefixRange = (vdl.registry.lookupOrCreateConstructor(_typeTablePrefixRange));
-module.exports.WatchRequest = (vdl.registry.lookupOrCreateConstructor(_typeWatchRequest));
@@ -203,6 +173,18 @@
]);
+module.exports.ConcurrentBatchError = makeError('v.io/syncbase/v23/services/syncbase/nosql.ConcurrentBatch', actions.NO_RETRY, {
+ 'en': '{1:}{2:} concurrent batch',
+}, [
+]);
+
+
+module.exports.SchemaVersionMismatchError = makeError('v.io/syncbase/v23/services/syncbase/nosql.SchemaVersionMismatch', actions.NO_RETRY, {
+ 'en': '{1:}{2:} actual schema version does not match the provided one',
+}, [
+]);
+
+
// Services:
@@ -214,13 +196,13 @@
-DatabaseWatcher.prototype.watch = function(ctx, serverCall, req) {
- throw new Error('Method Watch not implemented');
+DatabaseWatcher.prototype.getResumeMarker = function(ctx, serverCall) {
+ throw new Error('Method GetResumeMarker not implemented');
};
-DatabaseWatcher.prototype.getResumeMarker = function(ctx, serverCall) {
- throw new Error('Method GetResumeMarker not implemented');
+DatabaseWatcher.prototype.watchGlob = function(ctx, serverCall, req) {
+ throw new Error('Method WatchGlob not implemented');
};
@@ -228,18 +210,39 @@
DatabaseWatcher.prototype._serviceDescription = {
name: 'DatabaseWatcher',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
- embeds: [],
+ doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+ embeds: [{
+ name: 'GlobWatcher',
+ pkgPath: 'v.io/v23/services/watch',
+ doc: "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern. See the package comments for details."
+ },
+ ],
methods: [
{
- name: 'Watch',
- doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+ name: 'GetResumeMarker',
+ doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the event log. GetResumeMarker() can be called on a batch.",
+ inArgs: [],
+ outArgs: [{
+ name: '',
+ doc: "",
+ type: new watch.ResumeMarker()._type
+ },
+ ],
+ inStream: null,
+ outStream: null,
+ tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+ },
+
+
+ {
+ name: 'WatchGlob',
+ doc: "// WatchGlob returns a stream of changes that match a pattern.",
inArgs: [{
name: 'req',
doc: "",
- type: _typeWatchRequest
+ type: new watch.GlobRequest()._type
},
],
outArgs: [],
@@ -247,25 +250,9 @@
outStream: {
name: '',
doc: '',
- type: _typeChange
+ type: new watch.Change()._type
},
- tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
- },
-
-
- {
- name: 'GetResumeMarker',
- doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
- inArgs: [],
- outArgs: [{
- name: '',
- doc: "",
- type: _typeResumeMarker
- },
- ],
- inStream: null,
- outStream: null,
- tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+ tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
},
]
@@ -339,7 +326,7 @@
outArgs: [{
name: '',
doc: "",
- type: _type3
+ type: _type1
},
],
inStream: null,
@@ -517,7 +504,7 @@
outArgs: [{
name: 'members',
doc: "",
- type: _type4
+ type: _type2
},
],
inStream: null,
@@ -623,7 +610,7 @@
inStream: {
name: '',
doc: '',
- type: _type2
+ type: _type3
},
outStream: null,
tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
@@ -658,7 +645,7 @@
outArgs: [{
name: '',
doc: "",
- type: vdl.types.UINT64
+ type: vdl.types.INT64
},
],
inStream: null,
@@ -694,7 +681,7 @@
{
name: 'offset',
doc: "",
- type: vdl.types.UINT64
+ type: vdl.types.INT64
},
],
outArgs: [],
@@ -702,7 +689,7 @@
outStream: {
name: '',
doc: '',
- type: _type2
+ type: _type3
},
tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
},
@@ -855,37 +842,37 @@
-Database.prototype.create = function(ctx, serverCall, perms, metadata) {
+Database.prototype.create = function(ctx, serverCall, metadata, perms) {
throw new Error('Method Create not implemented');
};
-Database.prototype.delete = function(ctx, serverCall) {
+Database.prototype.delete = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Delete not implemented');
};
-Database.prototype.exists = function(ctx, serverCall) {
+Database.prototype.exists = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Exists not implemented');
};
-Database.prototype.beginBatch = function(ctx, serverCall, bo) {
+Database.prototype.beginBatch = function(ctx, serverCall, schemaVersion, bo) {
throw new Error('Method BeginBatch not implemented');
};
-Database.prototype.commit = function(ctx, serverCall) {
+Database.prototype.commit = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Commit not implemented');
};
-Database.prototype.exec = function(ctx, serverCall, query) {
+Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
throw new Error('Method Exec not implemented');
};
-Database.prototype.abort = function(ctx, serverCall) {
+Database.prototype.abort = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Abort not implemented');
};
@@ -900,13 +887,13 @@
};
-Database.prototype.watch = function(ctx, serverCall, req) {
- throw new Error('Method Watch not implemented');
+Database.prototype.getResumeMarker = function(ctx, serverCall) {
+ throw new Error('Method GetResumeMarker not implemented');
};
-Database.prototype.getResumeMarker = function(ctx, serverCall) {
- throw new Error('Method GetResumeMarker not implemented');
+Database.prototype.watchGlob = function(ctx, serverCall, req) {
+ throw new Error('Method WatchGlob not implemented');
};
@@ -1019,7 +1006,7 @@
Database.prototype._serviceDescription = {
name: 'Database',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n//\n// TODO(sadovsky): Add Watch method.",
+ doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n// Param schemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.\n//\n// TODO(sadovsky): Add Watch method.",
embeds: [{
name: 'Object',
pkgPath: 'v.io/v23/services/permissions',
@@ -1028,7 +1015,7 @@
{
name: 'DatabaseWatcher',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
+ doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
},
{
name: 'SyncGroupManager',
@@ -1053,15 +1040,15 @@
name: 'Create',
doc: "// Create creates this Database.\n// If perms is nil, we inherit (copy) the App perms.\n// Create requires the caller to have Write permission at the App.",
inArgs: [{
+ name: 'metadata',
+ doc: "",
+ type: _type5
+ },
+ {
name: 'perms',
doc: "",
type: new access.Permissions()._type
},
- {
- name: 'metadata',
- doc: "",
- type: _type6
- },
],
outArgs: [],
inStream: null,
@@ -1073,7 +1060,12 @@
{
name: 'Delete',
doc: "// Delete deletes this Database.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [],
inStream: null,
outStream: null,
@@ -1084,7 +1076,12 @@
{
name: 'Exists',
doc: "// Exists returns true only if this Database exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [{
name: '',
doc: "",
@@ -1101,6 +1098,11 @@
name: 'BeginBatch',
doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'bo',
doc: "",
type: _typeBatchOptions
@@ -1121,7 +1123,12 @@
{
name: 'Commit',
doc: "// Commit persists the pending changes to the database.\n// If this Database is not bound to a batch, Commit() will fail with\n// ErrNotBoundToBatch.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [],
inStream: null,
outStream: null,
@@ -1133,6 +1140,11 @@
name: 'Exec',
doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'query',
doc: "",
type: vdl.types.STRING
@@ -1143,7 +1155,7 @@
outStream: {
name: '',
doc: '',
- type: _type7
+ type: _type6
},
tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
},
@@ -1152,7 +1164,12 @@
{
name: 'Abort',
doc: "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [],
inStream: null,
outStream: null,
@@ -1203,12 +1220,28 @@
{
- name: 'Watch',
- doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+ name: 'GetResumeMarker',
+ doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the event log. GetResumeMarker() can be called on a batch.",
+ inArgs: [],
+ outArgs: [{
+ name: '',
+ doc: "",
+ type: new watch.ResumeMarker()._type
+ },
+ ],
+ inStream: null,
+ outStream: null,
+ tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+ },
+
+
+ {
+ name: 'WatchGlob',
+ doc: "// WatchGlob returns a stream of changes that match a pattern.",
inArgs: [{
name: 'req',
doc: "",
- type: _typeWatchRequest
+ type: new watch.GlobRequest()._type
},
],
outArgs: [],
@@ -1216,25 +1249,9 @@
outStream: {
name: '',
doc: '',
- type: _typeChange
+ type: new watch.Change()._type
},
- tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
- },
-
-
- {
- name: 'GetResumeMarker',
- doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
- inArgs: [],
- outArgs: [{
- name: '',
- doc: "",
- type: _typeResumeMarker
- },
- ],
- inStream: null,
- outStream: null,
- tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+ tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
},
@@ -1245,7 +1262,7 @@
outArgs: [{
name: '',
doc: "",
- type: _type3
+ type: _type1
},
],
inStream: null,
@@ -1423,7 +1440,7 @@
outArgs: [{
name: 'members',
doc: "",
- type: _type4
+ type: _type2
},
],
inStream: null,
@@ -1461,7 +1478,7 @@
inStream: {
name: '',
doc: '',
- type: _type2
+ type: _type3
},
outStream: null,
tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
@@ -1496,7 +1513,7 @@
outArgs: [{
name: '',
doc: "",
- type: vdl.types.UINT64
+ type: vdl.types.INT64
},
],
inStream: null,
@@ -1532,7 +1549,7 @@
{
name: 'offset',
doc: "",
- type: vdl.types.UINT64
+ type: vdl.types.INT64
},
],
outArgs: [],
@@ -1540,7 +1557,7 @@
outStream: {
name: '',
doc: '',
- type: _type2
+ type: _type3
},
tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
},
@@ -1665,42 +1682,42 @@
-Table.prototype.create = function(ctx, serverCall, perms) {
+Table.prototype.create = function(ctx, serverCall, schemaVersion, perms) {
throw new Error('Method Create not implemented');
};
-Table.prototype.delete = function(ctx, serverCall) {
+Table.prototype.delete = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Delete not implemented');
};
-Table.prototype.exists = function(ctx, serverCall) {
+Table.prototype.exists = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Exists not implemented');
};
-Table.prototype.deleteRowRange = function(ctx, serverCall, start, limit) {
+Table.prototype.deleteRowRange = function(ctx, serverCall, schemaVersion, start, limit) {
throw new Error('Method DeleteRowRange not implemented');
};
-Table.prototype.scan = function(ctx, serverCall, start, limit) {
+Table.prototype.scan = function(ctx, serverCall, schemaVersion, start, limit) {
throw new Error('Method Scan not implemented');
};
-Table.prototype.getPermissions = function(ctx, serverCall, key) {
+Table.prototype.getPermissions = function(ctx, serverCall, schemaVersion, key) {
throw new Error('Method GetPermissions not implemented');
};
-Table.prototype.setPermissions = function(ctx, serverCall, prefix, perms) {
+Table.prototype.setPermissions = function(ctx, serverCall, schemaVersion, prefix, perms) {
throw new Error('Method SetPermissions not implemented');
};
-Table.prototype.deletePermissions = function(ctx, serverCall, prefix) {
+Table.prototype.deletePermissions = function(ctx, serverCall, schemaVersion, prefix) {
throw new Error('Method DeletePermissions not implemented');
};
@@ -1709,7 +1726,7 @@
Table.prototype._serviceDescription = {
name: 'Table',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Table represents a collection of Rows.\n// Table.Glob operates over the primary keys of Rows in the Table.",
+ doc: "// Table represents a collection of Rows.\n// Table.Glob operates over the primary keys of Rows in the Table.\n// SchemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.",
embeds: [],
methods: [
@@ -1718,6 +1735,11 @@
name: 'Create',
doc: "// Create creates this Table.\n// If perms is nil, we inherit (copy) the Database perms.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'perms',
doc: "",
type: new access.Permissions()._type
@@ -1733,7 +1755,12 @@
{
name: 'Delete',
doc: "// Delete deletes this Table.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [],
inStream: null,
outStream: null,
@@ -1744,7 +1771,12 @@
{
name: 'Exists',
doc: "// Exists returns true only if this Table exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [{
name: '',
doc: "",
@@ -1761,14 +1793,19 @@
name: 'DeleteRowRange',
doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'start',
doc: "",
- type: _type2
+ type: _type3
},
{
name: 'limit',
doc: "",
- type: _type2
+ type: _type3
},
],
outArgs: [],
@@ -1782,14 +1819,19 @@
name: 'Scan',
doc: "// Scan returns all rows in the given half-open range [start, limit). If limit\n// is \"\", all rows with keys >= start are included. Concurrency semantics are\n// documented in model.go.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'start',
doc: "",
- type: _type2
+ type: _type3
},
{
name: 'limit',
doc: "",
- type: _type2
+ type: _type3
},
],
outArgs: [],
@@ -1807,6 +1849,11 @@
name: 'GetPermissions',
doc: "// GetPermissions returns an array of (prefix, perms) pairs. The array is\n// sorted from longest prefix to shortest, so element zero is the one that\n// applies to the row with the given key. The last element is always the\n// prefix \"\" which represents the table's permissions -- the array will always\n// have at least one element.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'key',
doc: "",
type: vdl.types.STRING
@@ -1815,7 +1862,7 @@
outArgs: [{
name: '',
doc: "",
- type: _type8
+ type: _type7
},
],
inStream: null,
@@ -1828,6 +1875,11 @@
name: 'SetPermissions',
doc: "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n// SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n// SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'prefix',
doc: "",
type: vdl.types.STRING
@@ -1849,6 +1901,11 @@
name: 'DeletePermissions',
doc: "// DeletePermissions deletes the permissions for the specified prefix. Any\n// rows covered by this prefix will use the next longest prefix's permissions\n// (see the array returned by GetPermissions).",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'prefix',
doc: "",
type: vdl.types.STRING
@@ -1870,22 +1927,22 @@
-Row.prototype.exists = function(ctx, serverCall) {
+Row.prototype.exists = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Exists not implemented');
};
-Row.prototype.get = function(ctx, serverCall) {
+Row.prototype.get = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Get not implemented');
};
-Row.prototype.put = function(ctx, serverCall, value) {
+Row.prototype.put = function(ctx, serverCall, schemaVersion, value) {
throw new Error('Method Put not implemented');
};
-Row.prototype.delete = function(ctx, serverCall) {
+Row.prototype.delete = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Delete not implemented');
};
@@ -1894,7 +1951,7 @@
Row.prototype._serviceDescription = {
name: 'Row',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Row represents a single row in a Table.\n// All access checks are performed against the most specific matching prefix\n// permissions in the Table.\n// NOTE(sadovsky): Currently we send []byte values over the wire for Get, Put,\n// and Scan. If there's a way to avoid encoding/decoding on the server side, we\n// can use vdl.Value everywhere without sacrificing performance.",
+ doc: "// Row represents a single row in a Table.\n// All access checks are performed against the most specific matching prefix\n// permissions in the Table.\n// SchemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.\n// NOTE(sadovsky): Currently we send []byte values over the wire for Get, Put,\n// and Scan. If there's a way to avoid encoding/decoding on the server side, we\n// can use vdl.Value everywhere without sacrificing performance.",
embeds: [],
methods: [
@@ -1902,7 +1959,12 @@
{
name: 'Exists',
doc: "// Exists returns true only if this Row exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [{
name: '',
doc: "",
@@ -1918,11 +1980,16 @@
{
name: 'Get',
doc: "// Get returns the value for this Row.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [{
name: '',
doc: "",
- type: _type2
+ type: _type3
},
],
inStream: null,
@@ -1935,9 +2002,14 @@
name: 'Put',
doc: "// Put writes the given value for this Row.",
inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
name: 'value',
doc: "",
- type: _type2
+ type: _type3
},
],
outArgs: [],
@@ -1950,7 +2022,12 @@
{
name: 'Delete',
doc: "// Delete deletes this Row.",
- inArgs: [],
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ ],
outArgs: [],
inStream: null,
outStream: null,
diff --git a/src/gen-vdl/v.io/v23/security/index.js b/src/gen-vdl/v.io/v23/security/index.js
index 4142766..5221584 100644
--- a/src/gen-vdl/v.io/v23/security/index.js
+++ b/src/gen-vdl/v.io/v23/security/index.js
@@ -279,6 +279,12 @@
module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
+ module.exports.SignatureForMessageSigningV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
+
+ module.exports.SignatureForBlessingCertificatesV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
+
+ module.exports.SignatureForDischargeV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
+
// Errors:
@@ -369,6 +375,13 @@
]);
+module.exports.InvalidSigningBlessingCaveatError = makeError('v.io/v23/security.InvalidSigningBlessingCaveat', actions.NO_RETRY, {
+ 'en': '{1:}{2:} blessing has caveat with UUID {3} which makes it unsuitable for signing -- please use blessings with just Expiry caveats',
+}, [
+ new uniqueid.Id()._type,
+]);
+
+
// Services:
diff --git a/src/gen-vdl/v.io/v23/services/watch/index.js b/src/gen-vdl/v.io/v23/services/watch/index.js
new file mode 100644
index 0000000..426a80c
--- /dev/null
+++ b/src/gen-vdl/v.io/v23/services/watch/index.js
@@ -0,0 +1,112 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+var vdl = require('vanadium').vdl;
+var makeError = require('vanadium').verror.makeError;
+var actions = require('vanadium').verror.actions;
+var canonicalize = require('vanadium').vdl.canonicalize;
+
+
+
+
+
+var access = require('./../../security/access');
+
+module.exports = {};
+
+
+
+// Types:
+var _typeChange = new vdl.Type();
+var _typeGlobRequest = new vdl.Type();
+var _typeResumeMarker = new vdl.Type();
+_typeChange.kind = vdl.kind.STRUCT;
+_typeChange.name = "v.io/v23/services/watch.Change";
+_typeChange.fields = [{name: "Name", type: vdl.types.STRING}, {name: "State", type: vdl.types.INT32}, {name: "Value", type: vdl.types.ANY}, {name: "ResumeMarker", type: _typeResumeMarker}, {name: "Continued", type: vdl.types.BOOL}];
+_typeGlobRequest.kind = vdl.kind.STRUCT;
+_typeGlobRequest.name = "v.io/v23/services/watch.GlobRequest";
+_typeGlobRequest.fields = [{name: "Pattern", type: vdl.types.STRING}, {name: "ResumeMarker", type: _typeResumeMarker}];
+_typeResumeMarker.kind = vdl.kind.LIST;
+_typeResumeMarker.name = "v.io/v23/services/watch.ResumeMarker";
+_typeResumeMarker.elem = vdl.types.BYTE;
+_typeChange.freeze();
+_typeGlobRequest.freeze();
+_typeResumeMarker.freeze();
+module.exports.Change = (vdl.registry.lookupOrCreateConstructor(_typeChange));
+module.exports.GlobRequest = (vdl.registry.lookupOrCreateConstructor(_typeGlobRequest));
+module.exports.ResumeMarker = (vdl.registry.lookupOrCreateConstructor(_typeResumeMarker));
+
+
+
+
+// Consts:
+
+ module.exports.Exists = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(0, true), vdl.types.INT32);
+
+ module.exports.DoesNotExist = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(1, true), vdl.types.INT32);
+
+ module.exports.InitialStateSkipped = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(2, true), vdl.types.INT32);
+
+
+
+// Errors:
+
+module.exports.UnknownResumeMarkerError = makeError('v.io/v23/services/watch.UnknownResumeMarker', actions.NO_RETRY, {
+ 'en': '{1:}{2:} unknown resume marker {_}',
+}, [
+]);
+
+
+
+
+// Services:
+
+
+
+function GlobWatcher(){}
+module.exports.GlobWatcher = GlobWatcher;
+
+
+
+GlobWatcher.prototype.watchGlob = function(ctx, serverCall, req) {
+ throw new Error('Method WatchGlob not implemented');
+};
+
+
+
+GlobWatcher.prototype._serviceDescription = {
+ name: 'GlobWatcher',
+ pkgPath: 'v.io/v23/services/watch',
+ doc: "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern. See the package comments for details.",
+ embeds: [],
+ methods: [
+
+
+ {
+ name: 'WatchGlob',
+ doc: "// WatchGlob returns a stream of changes that match a pattern.",
+ inArgs: [{
+ name: 'req',
+ doc: "",
+ type: _typeGlobRequest
+ },
+ ],
+ outArgs: [],
+ inStream: null,
+ outStream: {
+ name: '',
+ doc: '',
+ type: _typeChange
+ },
+ tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
+ },
+
+ ]
+};
+
+
+
+
+
diff --git a/src/nosql/batch-database.js b/src/nosql/batch-database.js
index 7fac2d6..2329dde 100644
--- a/src/nosql/batch-database.js
+++ b/src/nosql/batch-database.js
@@ -14,12 +14,14 @@
* @constructor
* @inner
* @param {module:syncbase.database.Database} db Database.
+ * @param {number} schemaVersion Database schema version expected by client.
*/
-function BatchDatabase(db) {
+function BatchDatabase(db, schemaVersion) {
if (!(this instanceof BatchDatabase)) {
- return new BatchDatabase(db);
+ return new BatchDatabase(db, schemaVersion);
}
+ this.schemaVersion = schemaVersion;
Object.defineProperty(this, '_db', {
enumerable: false,
value: db,
@@ -51,7 +53,7 @@
* @param {function} cb Callback.
*/
BatchDatabase.prototype.commit = function(ctx, cb) {
- this._db._wire(ctx).commit(ctx, cb);
+ this._db._wire(ctx).commit(ctx, this.schemaVersion, cb);
};
/**
@@ -62,7 +64,7 @@
* @param {function} cb Callback.
*/
BatchDatabase.prototype.abort = function(ctx, cb) {
- this._db._wire(ctx).abort(ctx, cb);
+ this._db._wire(ctx).abort(ctx, this.schemaVersion, cb);
};
/**
diff --git a/src/nosql/batch.js b/src/nosql/batch.js
index cd5fa36..157a21d 100644
--- a/src/nosql/batch.js
+++ b/src/nosql/batch.js
@@ -8,7 +8,7 @@
* @summary
* runInBatch runs a function with a newly created batch. If the function
* errors, the batch is aborted. If the function succeeds, the batch is
- * committed. If an error occurs during commit, then the batch is aborted.
+ * committed.
*
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:syncbase.database.Database} db Database.
@@ -17,41 +17,39 @@
* batch.
* @param {module:vanadium~voidCb} cb Callback that will be called after the
* batch has been committed or aborted.
- *
- * TODO(nlacasse): Add retry loop.
*/
function runInBatch(ctx, db, opts, fn, cb) {
- db.beginBatch(ctx, opts, function(err, batchDb) {
- if (err) {
- return cb(err);
- }
-
- function onError(err) {
- batchDb.abort(ctx, function() {
- cb(err);
- });
- }
-
- function onSuccess() {
- // TODO(nlacasse): Commit() can fail for a number of reasons, e.g. RPC
- // failure or ErrConcurrentTransaction. Depending on the cause of
- // failure, it may be desirable to retry the Commit() and/or to call
- // Abort(). For now, we always abort on a failed commit.
- batchDb.commit(ctx, function(commitErr) {
- if (commitErr) {
- return onError(commitErr);
- }
- return cb(null);
- });
- }
-
- fn(batchDb, function(err) {
+ function attempt(cb) {
+ db.beginBatch(ctx, opts, function(err, batchDb) {
if (err) {
- return onError(err);
+ return cb(err);
}
- onSuccess();
+ fn(batchDb, function(err) {
+ if (err) {
+ return batchDb.abort(ctx, function() {
+ return cb(err); // return fn error, not abort error
+ });
+ }
+ // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
+ // failure or ErrConcurrentTransaction. Depending on the cause of
+ // failure, it may be desirable to retry the commit() and/or to call
+ // abort().
+ batchDb.commit(ctx, cb);
+ });
});
- });
+ }
+
+ function retryLoop(i) {
+ attempt(function(err) {
+ if (err && i < 2) {
+ retryLoop(i + 1);
+ } else {
+ cb(err);
+ }
+ });
+ }
+
+ retryLoop(0);
}
/**
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 41f5384..4575ac1 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -24,14 +24,18 @@
* Database.
* @param {string} relativeName Relative name of this Database. Must not
* contain slashes.
+ * @param {module:syncbase.schema.Schema} schema Schema for the database.
+ * TODO(nlacasse): port definition of Schema from go to javascript
*/
-function Database(parentFullName, relativeName) {
+function Database(parentFullName, relativeName, schema) {
if (!(this instanceof Database)) {
return new Database(parentFullName, relativeName);
}
util.addNameProperties(this, parentFullName, relativeName);
+ this.schema = null; // TODO(nlacasse): use schema from params
+ this.schemaVersion = -1; // TODO(nlacasse): derive this from schema
/**
* Caches the database wire object.
* @private
@@ -68,7 +72,8 @@
* https://vanadium-review.googlesource.com/#/c/13040/ .
*/
Database.prototype.create = function(ctx, perms, cb) {
- this._wire(ctx).create(ctx, perms, null, cb);
+ //TODO(nlacasse): pass schema.metadata below instead of null
+ this._wire(ctx).create(ctx, null, perms, cb);
};
/**
@@ -77,7 +82,7 @@
* @param {function} cb Callback.
*/
Database.prototype.delete = function(ctx, cb) {
- this._wire(ctx).delete(ctx, cb);
+ this._wire(ctx).delete(ctx, this.schemaVersion, cb);
};
/**
@@ -89,7 +94,7 @@
* @param {function} cb Callback.
*/
Database.prototype.exists = function(ctx, cb) {
- this._wire(ctx).exists(ctx, cb);
+ this._wire(ctx).exists(ctx, this.schemaVersion, cb);
};
/**
@@ -130,7 +135,7 @@
return cb(null, res.map(unwrap));
});
- var stream = this._wire(ctx).exec(ctx, query, cb).stream;
+ var stream = this._wire(ctx).exec(ctx, this.schemaVersion, query, cb).stream;
var decodedStream = stream.pipe(streamUnwrapper);
stream.on('error', function(err) {
@@ -146,7 +151,7 @@
* @return {module:syncbase.table.Table} Table object.
*/
Database.prototype.table = function(relativeName) {
- return new Table(this.fullName, relativeName);
+ return new Table(this.fullName, relativeName, this.schemaVersion);
};
/**
@@ -194,7 +199,7 @@
* @param {function} cb Callback.
*/
Database.prototype.createTable = function(ctx, relativeName, perms, cb) {
- this._tableWire(ctx, relativeName).create(ctx, perms, cb);
+ this._tableWire(ctx, relativeName).create(ctx, this.schemaVersion, perms, cb);
};
/**
@@ -205,7 +210,7 @@
* @param {function} cb Callback.
*/
Database.prototype.deleteTable = function(ctx, relativeName, cb) {
- this._tableWire(ctx, relativeName).delete(ctx, cb);
+ this._tableWire(ctx, relativeName).delete(ctx, this.schemaVersion, cb);
};
/**
@@ -256,18 +261,19 @@
*/
Database.prototype.beginBatch = function(ctx, opts, cb) {
var self = this;
- this._wire(ctx).beginBatch(ctx, opts, function(err, relativeName) {
- if (err) {
- return cb(err);
- }
+ this._wire(ctx).beginBatch(ctx, this.schemaVersion, opts,
+ function(err, relativeName) {
+ if (err) {
+ return cb(err);
+ }
- // The relativeName returned from the beginBatch() call above is different
- // than the relativeName of the current database. We must create a new
- // Database with this new relativeName, and then create a BatchDatabase
- // from that new Database.
- var db = new Database(self._parentFullName, relativeName);
- return cb(null, new BatchDatabase(db));
- });
+ // The relativeName returned from the beginBatch() call above is different
+ // than the relativeName of the current database. We must create a new
+ // Database with this new relativeName, and then create a BatchDatabase
+ // from that new Database.
+ var db = new Database(self._parentFullName, relativeName);
+ return cb(null, new BatchDatabase(db));
+ });
};
/**
diff --git a/src/nosql/row.js b/src/nosql/row.js
index ba9f633..ae679b1 100644
--- a/src/nosql/row.js
+++ b/src/nosql/row.js
@@ -12,13 +12,14 @@
* @summary
* Represents a single row in a Table.
* Private constructor, use table.row() to get an instance.
+ * @param {number} schemaVersion Database schema version expected by client.
* @inner
* @constructor
* @memberof module:syncbase.nosql
*/
-function Row(parentFullName, key) {
+function Row(parentFullName, key, schemaVersion) {
if (!(this instanceof Row)) {
- return new Row(parentFullName, key);
+ return new Row(parentFullName, key, schemaVersion);
}
// TODO(aghassemi) We may need to escape the key. Align with Go implementation
@@ -27,6 +28,7 @@
// Should they all behave the same or is row key really different?
var fullName = vanadium.naming.join(parentFullName, key);
+ this.schemaVersion = schemaVersion;
/**
* The key of this Row.
* @property name
@@ -82,7 +84,7 @@
* @param {function} cb Callback.
*/
Row.prototype.exists = function(ctx, cb) {
- this._wire(ctx).exists(ctx, cb);
+ this._wire(ctx).exists(ctx, this.schemaVersion, cb);
};
/**
@@ -91,7 +93,7 @@
* @param {function} cb Callback.
*/
Row.prototype.get = function(ctx, cb) {
- this._wire(ctx).get(ctx, function(err, value) {
+ this._wire(ctx).get(ctx, this.schemaVersion, function(err, value) {
if (err) {
return cb(err);
}
@@ -121,7 +123,7 @@
} catch (e) {
return cb(e);
}
- this._wire(ctx).put(ctx, encodedVal, cb);
+ this._wire(ctx).put(ctx, this.schemaVersion, encodedVal, cb);
};
/**
@@ -130,5 +132,5 @@
* @param {function} cb Callback.
*/
Row.prototype.delete = function(ctx, cb) {
- this._wire(ctx).delete(ctx, cb);
+ this._wire(ctx).delete(ctx, this.schemaVersion, cb);
};
diff --git a/src/nosql/table.js b/src/nosql/table.js
index 824f816..96a6e98 100644
--- a/src/nosql/table.js
+++ b/src/nosql/table.js
@@ -21,17 +21,20 @@
* Table.
* @param {string} relativeName Relative name of this Table. Must not
* contain slashes.
+ * @param {number} schemaVersion Database schema version expected by client.
* @constructor
* @inner
* @memberof {module:syncbase.nosql}
*/
-function Table(parentFullName, relativeName) {
+function Table(parentFullName, relativeName, schemaVersion) {
if (!(this instanceof Table)) {
- return new Table(parentFullName, relativeName);
+ return new Table(parentFullName, relativeName, schemaVersion);
}
util.addNameProperties(this, parentFullName, relativeName);
+ this.schemaVersion = schemaVersion;
+
/**
* Caches the table wire object.
* @private
@@ -66,7 +69,7 @@
* @param {function} cb Callback.
*/
Table.prototype.exists = function(ctx, cb) {
- this._wire(ctx).exists(ctx, cb);
+ this._wire(ctx).exists(ctx, this.schemaVersion, cb);
};
/**
@@ -75,7 +78,7 @@
* @return {module:syncbase.row.Row} Row object.
*/
Table.prototype.row = function(key) {
- return new Row(this.fullName, key);
+ return new Row(this.fullName, key, this.schemaVersion);
};
/**
@@ -116,7 +119,8 @@
* @param {function} cb Callback.
*/
Table.prototype.delete = function(ctx, range, cb) {
- this._wire(ctx).deleteRowRange(ctx, range.start, range.limit, cb);
+ this._wire(ctx).deleteRowRange(
+ ctx, this.schemaVersion, range.start, range.limit, cb);
};
/**
@@ -143,7 +147,8 @@
});
});
- var stream = this._wire(ctx).scan(ctx, range.start, range.limit, cb).stream;
+ var stream = this._wire(ctx)
+ .scan(ctx, this.schemaVersion, range.start, range.limit, cb).stream;
var decodedStream = stream.pipe(vomStreamDecoder);
stream.on('error', function(err) {
decodedStream.emit('error', err);
@@ -171,7 +176,8 @@
* @param {function} cb Callback.
*/
Table.prototype.setPermissions = function(ctx, prefix, perms, cb) {
- this._wire(ctx).setPermissions(ctx, stringifyPrefix(prefix), perms, cb);
+ this._wire(ctx).setPermissions(
+ ctx, this.schemaVersion, stringifyPrefix(prefix), perms, cb);
};
@@ -190,20 +196,22 @@
// There are two PrefixPermission types, one is the wire type which has
// Prefix as a string and then there is the client type where prefix is a
// PrefixRange, therefore we convert between the wire and client types.
- this._wire(ctx).getPermissions(ctx, key, function(err, wirePerms) {
- if (err) {
- return cb(err);
- }
+ this._wire(ctx).getPermissions(ctx, this.schemaVersion, key,
+ function(err, wirePerms) {
+ if (err) {
+ return cb(err);
+ }
- var perms = wirePerms.map(function(v) {
- return new PrefixPermissions(
- prefix(v.prefix),
- v.perms
- );
- });
+ var perms = wirePerms.map(function(v) {
+ return new PrefixPermissions(
+ prefix(v.prefix),
+ v.perms
+ );
+ });
- cb(null, perms);
- });
+ cb(null, perms);
+ }
+ );
};
/**
@@ -216,7 +224,8 @@
*/
Table.prototype.deletePermissions = function(ctx, prefix, cb) {
//TODO(aghassemi): Why is prefix a PrefixRange in Go?
- this._wire(ctx).deletePermissions(ctx, stringifyPrefix(prefix), cb);
+ this._wire(ctx).deletePermissions(
+ ctx, this.schemaVersion, stringifyPrefix(prefix), cb);
};
function stringifyPrefix(prefix) {
diff --git a/test/integration/test-batch.js b/test/integration/test-batch.js
index eb2f2cb..2b200d5 100644
--- a/test/integration/test-batch.js
+++ b/test/integration/test-batch.js
@@ -142,8 +142,10 @@
return batch.table(table.name);
});
+ // Put to the same key in each batch.
+ var key = uniqueName('key');
async.mapSeries(batchTables, function(batchTable, cb) {
- var key = uniqueName('key');
+ // Put different value in each batch.
var value = uniqueName('value');
batchTable.put(ctx, key, value, function(err) {
if (err) {
@@ -339,19 +341,33 @@
var db = o.database;
var table = o.table;
- db.beginBatch(ctx, {}, putTable);
+ db.beginBatch(ctx, {}, readBatchTable);
var key = uniqueName('key');
var value = uniqueName('value');
var batch;
+ var batchTable;
- function putTable(err, _batch) {
+ function readBatchTable(err, _batch) {
if (err) {
- return end(err);
+ return t.end(err);
}
batch = _batch;
+ batchTable = batch.table(table.name);
+
+ batchTable.get(ctx, key, function(err) {
+ // Should error because the key does not exist yet.
+ t.ok(err, 'get should error when key does not exist');
+ putTable();
+ });
+ }
+
+ function putTable(err) {
+ if (err) {
+ return end(err);
+ }
// Put on the table directly, not the batch table. This will conflict
// with future batchTable.put() call.
@@ -365,7 +381,6 @@
var newValue = uniqueName('value');
- var batchTable = batch.table(table.name);
batchTable.put(ctx, key, newValue, commit);
}
diff --git a/test/integration/test-run-in-batch.js b/test/integration/test-run-in-batch.js
index 84011f3..cc29d5a 100644
--- a/test/integration/test-run-in-batch.js
+++ b/test/integration/test-run-in-batch.js
@@ -77,7 +77,7 @@
});
});
-test('runInBatch aborts if commit fails', function(t) {
+test('runInBatch does not abort if commit fails', function(t) {
var ctx = {};
var db = new MockDb(true);
@@ -90,7 +90,7 @@
t.ok(db.batchDb, 'batch db is created');
t.ok(db.batchDb.commitCalled, 'batchDb.commit() was called');
- t.ok(db.batchDb.abortCalled, 'batchDb.abort() was called');
+ t.notok(db.batchDb.abortCalled, 'batchDb.abort() was not called');
t.end();
});
diff --git a/test/start-syncbased.sh b/test/start-syncbased.sh
index 6cf9f24..bedc838 100755
--- a/test/start-syncbased.sh
+++ b/test/start-syncbased.sh
@@ -9,7 +9,17 @@
# does not allow flags or arguments to the executables it starts. We should
# fix service-runner to allow flags/arguments, and then have it start syncbased
# directly with the appropriate flags. Then we can delete this file.
-# TODO(rdaoud): how to cleanup the tmp test dir; "rm" here doesn't do it.
-testdir="$(mktemp -d "${TMPDIR:-/tmp}"/sbtest.XXXXXXXX)"
-syncbased -v=1 --name test/syncbased --engine memstore --root-dir "${testdir}" --v23.tcp.address 127.0.0.1:0
+TESTDIR="$(mktemp -d "${TMPDIR:-/tmp}"/sbtest.XXXXXXXX)"
+# Delete TESTDIR and stop syncbased on exit.
+function cleanup {
+ rm -rf "${TESTDIR}"
+ kill -TERM "${CHILD}" 2>/dev/null
+ exit 0
+}
+trap cleanup SIGINT SIGTERM EXIT
+
+syncbased -v=3 --name test/syncbased --engine "${STORAGE_ENGINE:-leveldb}" --root-dir "${TESTDIR}" --v23.tcp.address 127.0.0.1:0 &
+
+CHILD=$!
+wait "${CHILD}"