Implement client watch.
API modeled after the Go api in go/vcl/14421
Change-Id: I658e515876f6049e865b26b353be33b3628bf6b3
diff --git a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
index 1e5f77d..168e415 100644
--- a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
@@ -29,11 +29,11 @@
var _type6 = new vdl.Type();
var _type7 = new vdl.Type();
var _typeBatchOptions = new vdl.Type();
+var _typeBlobFetchState = new vdl.Type();
+var _typeBlobFetchStatus = new vdl.Type();
var _typeBlobRef = new vdl.Type();
var _typeCrPolicy = new vdl.Type();
var _typeCrRule = new vdl.Type();
-var _typeFetchState = new vdl.Type();
-var _typeFetchStatus = new vdl.Type();
var _typeKeyValue = new vdl.Type();
var _typePrefixPermissions = new vdl.Type();
var _typeResolverType = new vdl.Type();
@@ -66,6 +66,12 @@
_typeBatchOptions.kind = vdl.kind.STRUCT;
_typeBatchOptions.name = "v.io/syncbase/v23/services/syncbase/nosql.BatchOptions";
_typeBatchOptions.fields = [{name: "Hint", type: vdl.types.STRING}, {name: "ReadOnly", type: vdl.types.BOOL}];
+_typeBlobFetchState.kind = vdl.kind.ENUM;
+_typeBlobFetchState.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobFetchState";
+_typeBlobFetchState.labels = ["Pending", "Locating", "Fetching", "Done"];
+_typeBlobFetchStatus.kind = vdl.kind.STRUCT;
+_typeBlobFetchStatus.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobFetchStatus";
+_typeBlobFetchStatus.fields = [{name: "State", type: _typeBlobFetchState}, {name: "Received", type: vdl.types.INT64}, {name: "Total", type: vdl.types.INT64}];
_typeBlobRef.kind = vdl.kind.STRING;
_typeBlobRef.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobRef";
_typeCrPolicy.kind = vdl.kind.STRUCT;
@@ -74,12 +80,6 @@
_typeCrRule.kind = vdl.kind.STRUCT;
_typeCrRule.name = "v.io/syncbase/v23/services/syncbase/nosql.CrRule";
_typeCrRule.fields = [{name: "TableName", type: vdl.types.STRING}, {name: "KeyPrefix", type: vdl.types.STRING}, {name: "Type", type: vdl.types.STRING}, {name: "Resolver", type: _typeResolverType}];
-_typeFetchState.kind = vdl.kind.ENUM;
-_typeFetchState.name = "v.io/syncbase/v23/services/syncbase/nosql.FetchState";
-_typeFetchState.labels = ["Pending", "Locating", "Fetching", "Done"];
-_typeFetchStatus.kind = vdl.kind.STRUCT;
-_typeFetchStatus.name = "v.io/syncbase/v23/services/syncbase/nosql.FetchStatus";
-_typeFetchStatus.fields = [{name: "State", type: _typeFetchState}, {name: "Received", type: vdl.types.UINT64}, {name: "Total", type: vdl.types.UINT64}];
_typeKeyValue.kind = vdl.kind.STRUCT;
_typeKeyValue.name = "v.io/syncbase/v23/services/syncbase/nosql.KeyValue";
_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type3}];
@@ -109,11 +109,11 @@
_type6.freeze();
_type7.freeze();
_typeBatchOptions.freeze();
+_typeBlobFetchState.freeze();
+_typeBlobFetchStatus.freeze();
_typeBlobRef.freeze();
_typeCrPolicy.freeze();
_typeCrRule.freeze();
-_typeFetchState.freeze();
-_typeFetchStatus.freeze();
_typeKeyValue.freeze();
_typePrefixPermissions.freeze();
_typeResolverType.freeze();
@@ -122,16 +122,16 @@
_typeSyncGroupMemberInfo.freeze();
_typeSyncGroupSpec.freeze();
module.exports.BatchOptions = (vdl.registry.lookupOrCreateConstructor(_typeBatchOptions));
+module.exports.BlobFetchState = {
+ PENDING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Pending', true), _typeBlobFetchState),
+ LOCATING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Locating', true), _typeBlobFetchState),
+ FETCHING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Fetching', true), _typeBlobFetchState),
+ DONE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Done', true), _typeBlobFetchState),
+};
+module.exports.BlobFetchStatus = (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchStatus));
module.exports.BlobRef = (vdl.registry.lookupOrCreateConstructor(_typeBlobRef));
module.exports.CrPolicy = (vdl.registry.lookupOrCreateConstructor(_typeCrPolicy));
module.exports.CrRule = (vdl.registry.lookupOrCreateConstructor(_typeCrRule));
-module.exports.FetchState = {
- PENDING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Pending', true), _typeFetchState),
- LOCATING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Locating', true), _typeFetchState),
- FETCHING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Fetching', true), _typeFetchState),
- DONE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Done', true), _typeFetchState),
-};
-module.exports.FetchStatus = (vdl.registry.lookupOrCreateConstructor(_typeFetchStatus));
module.exports.KeyValue = (vdl.registry.lookupOrCreateConstructor(_typeKeyValue));
module.exports.PrefixPermissions = (vdl.registry.lookupOrCreateConstructor(_typePrefixPermissions));
module.exports.ResolverType = {
@@ -185,6 +185,12 @@
]);
+module.exports.BlobNotCommittedError = makeError('v.io/syncbase/v23/services/syncbase/nosql.BlobNotCommitted', actions.NO_RETRY, {
+ 'en': '{1:}{2:} blob is not yet committed',
+}, [
+]);
+
+
// Services:
@@ -210,7 +216,7 @@
DatabaseWatcher.prototype._serviceDescription = {
name: 'DatabaseWatcher',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+ doc: "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
embeds: [{
name: 'GlobWatcher',
pkgPath: 'v.io/v23/services/watch',
@@ -714,7 +720,7 @@
outStream: {
name: '',
doc: '',
- type: _typeFetchStatus
+ type: _typeBlobFetchStatus
},
tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
},
@@ -857,6 +863,11 @@
};
+Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
+ throw new Error('Method Exec not implemented');
+};
+
+
Database.prototype.beginBatch = function(ctx, serverCall, schemaVersion, bo) {
throw new Error('Method BeginBatch not implemented');
};
@@ -867,11 +878,6 @@
};
-Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
- throw new Error('Method Exec not implemented');
-};
-
-
Database.prototype.abort = function(ctx, serverCall, schemaVersion) {
throw new Error('Method Abort not implemented');
};
@@ -1015,7 +1021,7 @@
{
name: 'DatabaseWatcher',
pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
- doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
+ doc: "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
},
{
name: 'SyncGroupManager',
@@ -1095,6 +1101,31 @@
{
+ name: 'Exec',
+ doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
+ inArgs: [{
+ name: 'schemaVersion',
+ doc: "",
+ type: vdl.types.INT32
+ },
+ {
+ name: 'query',
+ doc: "",
+ type: vdl.types.STRING
+ },
+ ],
+ outArgs: [],
+ inStream: null,
+ outStream: {
+ name: '',
+ doc: '',
+ type: _type6
+ },
+ tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+ },
+
+
+ {
name: 'BeginBatch',
doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
inArgs: [{
@@ -1137,31 +1168,6 @@
{
- name: 'Exec',
- doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
- inArgs: [{
- name: 'schemaVersion',
- doc: "",
- type: vdl.types.INT32
- },
- {
- name: 'query',
- doc: "",
- type: vdl.types.STRING
- },
- ],
- outArgs: [],
- inStream: null,
- outStream: {
- name: '',
- doc: '',
- type: _type6
- },
- tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
- },
-
-
- {
name: 'Abort',
doc: "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
inArgs: [{
@@ -1582,7 +1588,7 @@
outStream: {
name: '',
doc: '',
- type: _typeFetchStatus
+ type: _typeBlobFetchStatus
},
tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
},
@@ -1791,7 +1797,7 @@
{
name: 'DeleteRowRange',
- doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
+ doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.\n// TODO(sadovsky): Delete prefix perms fully covered by the row range?",
inArgs: [{
name: 'schemaVersion',
doc: "",
diff --git a/src/gen-vdl/v.io/v23/security/index.js b/src/gen-vdl/v.io/v23/security/index.js
index 5221584..de0e2eb 100644
--- a/src/gen-vdl/v.io/v23/security/index.js
+++ b/src/gen-vdl/v.io/v23/security/index.js
@@ -273,17 +273,17 @@
module.exports.SHA512Hash = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeHash))("SHA512", true), _typeHash);
- module.exports.SignatureForMessageSigning = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S", true), vdl.types.STRING);
+ module.exports.SignatureForMessageSigning = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
- module.exports.SignatureForBlessingCertificates = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B", true), vdl.types.STRING);
+ module.exports.SignatureForBlessingCertificates = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
- module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
+ module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
- module.exports.SignatureForMessageSigningV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
+ module.exports.SignatureForMessageSigningV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S", true), vdl.types.STRING);
- module.exports.SignatureForBlessingCertificatesV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
+ module.exports.SignatureForBlessingCertificatesV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B", true), vdl.types.STRING);
- module.exports.SignatureForDischargeV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
+ module.exports.SignatureForDischargeV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 58c0993..febf66f 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -10,11 +10,14 @@
// vanadium.vdl object.
var unwrap = require('vanadium/src/vdl/type-util').unwrap;
-var BatchDatabase = require('./batch-database');
var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
+
+var BatchDatabase = require('./batch-database');
var SyncGroup = require('./syncgroup');
var Table = require('./table');
var util = require('../util');
+var watch = require('./watch');
/**
* Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -214,6 +217,80 @@
};
/**
+ * Watches for updates to the database. For each watch request, the client will
+ * receive a reliable stream of watch events without re-ordering.
+ *
+ * This method is designed to be used in the following way:
+ * 1) begin a read-only batch
+ * 2) read all information your app needs
+ * 3) read the ResumeMarker
+ * 4) abort the batch
+ * 5) start watching for changes to the data using the ResumeMarker
+ *
+ * In this configuration the client doesn't miss any changes.
+ *
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {string} table Name of table to watch.
+ * @param {string} prefix Prefix of keys to watch.
+ * @param {module:syncbase.nosql.watch.ResumeMarker} resumeMarker ResumeMarker
+ * to resume watching from.
+ * @param {function} [cb] Optional callback that will be called after watch RPC
+ * finishes.
+ * @returns {stream} Stream of WatchChange objects.
+ */
+Database.prototype.watch = function(ctx, tableName, prefix, resumeMarker, cb) {
+ var globReq = new watchVdl.GlobRequest({
+ pattern: vanadium.naming.join(tableName, prefix + '*'),
+ resumeMarker: resumeMarker
+ });
+
+ var watchChangeEncoder = through2({
+ objectMode: true
+ }, function(change, enc, cb) {
+ var changeType;
+ switch (change.state) {
+ case watchVdl.Exists.val:
+ changeType = 'put';
+ break;
+ case watchVdl.DoesNotExist.val:
+ changeType = 'delete';
+ break;
+ default:
+ return cb(new Error('invalid change state ' + change.state));
+ }
+
+ var wc = new watch.WatchChange({
+ tableName: vanadium.naming.stripBasename(change.name),
+ rowName: vanadium.naming.basename(change.name),
+ changeType: changeType,
+ valueBytes: changeType === 'put' ? change.value.value : null,
+ resumeMarker: change.resumeMarker,
+ fromSync: change.value.fromSync,
+ continued: change.continued
+ });
+ return cb(null, wc);
+ });
+
+ var stream = this._wire(ctx).watchGlob(ctx, globReq, cb).stream;
+
+ var watchChangeStream = stream.pipe(watchChangeEncoder);
+ stream.on('error', function(err) {
+ watchChangeStream.emit('error', err);
+ });
+
+ return watchChangeStream;
+};
+
+/**
+ * Gets the ResumeMarker that points to the current end of the event log.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Database.prototype.getResumeMarker = function(ctx, cb) {
+ this._wire(ctx).getResumeMarker(ctx, cb);
+};
+
+/**
* Replaces the current Permissions for the Database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:vanadium.security.access.Permissions} perms Permissions for
diff --git a/src/nosql/index.js b/src/nosql/index.js
index c589058..5bbab69 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -5,6 +5,7 @@
var rowrange = require('./rowrange');
var runInBatch = require('./batch');
var vdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+var watch = require('./watch');
/**
* @summary
@@ -16,8 +17,10 @@
module.exports = {
BatchOptions: vdl.BatchOptions,
ReadOnlyBatchError: vdl.ReadOnlyBatchError,
+ ResumeMarker: watch.ResumeMarker,
rowrange: rowrange,
runInBatch: runInBatch,
SyncGroupMemberInfo: vdl.SyncGroupMemberInfo,
- SyncGroupSpec: vdl.SyncGroupSpec
+ SyncGroupSpec: vdl.SyncGroupSpec,
+ WatchChange: watch.WatchChange
};
diff --git a/src/nosql/watch.js b/src/nosql/watch.js
new file mode 100644
index 0000000..dba87e2
--- /dev/null
+++ b/src/nosql/watch.js
@@ -0,0 +1,107 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var vom = require('vanadium').vom;
+
+var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
+
+module.exports = {
+ ResumeMarker: watchVdl.ResumeMarker,
+ WatchChange: WatchChange
+};
+
+/**
+ * WatchChange represents the new value for a watched entity.
+ * @constructor
+ */
+function WatchChange(opts) {
+ /**
+ * @property tableName
+ * The name of the table that contains the changed row
+ */
+ Object.defineProperty(this, 'tableName', {
+ enumerable: true,
+ value: opts.tableName,
+ writable: false
+ });
+
+ /**
+ * @property rowName
+ * Name of the changed row.
+ */
+ Object.defineProperty(this, 'rowName', {
+ enumerable: true,
+ value: opts.rowName,
+ writable: false
+ });
+
+ /**
+ * @property changeType
+ * Describes the type of the change. If the changeType equals 'put', then the
+ * row exists in the table and the value contains the new value for this row.
+ * If the state equals 'delete', then the row was removed from the table.
+ */
+ Object.defineProperty(this, 'changeType', {
+ enumerable: true,
+ value: opts.changeType,
+ writable: false
+ });
+
+ /**
+ * @property valueBytes
+ * The new VOM-encoded value for the row if the changeType is 'put' or nil
+ * otherwise.
+ */
+ Object.defineProperty(this, 'valueBytes', {
+ enumerable: true,
+ value: opts.valueBytes,
+ writable: false
+ });
+
+ /**
+ * @property resumeMarker
+ * Provides a compact representation of all the messages that have been
+ * received by the caller for the given watch call. This marker can be
+ * provided in the request message to allow the caller to resume the stream
+ * watching at a specific point without fetching the initial state.
+ */
+ Object.defineProperty(this, 'resumeMarker', {
+ enumerable: true,
+ value: opts.resumeMarker,
+ writable: false
+ });
+
+ /**
+ * @property fromSync
+ * Indicates whether the change came from sync. If fromSync is false, then
+ * the change originated from the local device.
+ */
+ Object.defineProperty(this, 'fromSync', {
+ enumerable: true,
+ value: opts.fromSync || false,
+ writable: false
+ });
+
+ /**
+ * @property continued
+ * If true, this WatchChange is followed by more WatchChanges that are in the
+ * same batch as this WatchChange
+ */
+ Object.defineProperty(this, 'continued', {
+ enumerable: true,
+ value: opts.continued || false,
+ writable: false
+ });
+}
+
+/**
+ * Decodes the new value of the watched element.
+ */
+WatchChange.prototype.getValue = function(cb) {
+ if (this.changeType === 'delete') {
+ return cb(new Error('invalid change type'));
+ }
+
+ vom.decode(this.valueBytes, false, null, cb);
+};
diff --git a/test/integration/test-watch.js b/test/integration/test-watch.js
new file mode 100644
index 0000000..fca1cec
--- /dev/null
+++ b/test/integration/test-watch.js
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var async = require('async');
+var test = require('prova');
+var vom = require('vanadium').vom;
+
+var syncbase = require('../..');
+var WatchChange = syncbase.nosql.WatchChange;
+
+var testUtil = require('./util');
+var setupTable = testUtil.setupTable;
+var uniqueName = testUtil.uniqueName;
+
+// Tests the basic client watch functionality (no perms or batches). First
+// does some puts and deletes, fetching a ResumeMarker after each operation.
+// Then calls 'watch' with different prefixes and ResumeMarkers and verifies
+// that the resulting stream contains the correct changes.
+test('basic client watch', function(t) {
+ setupTable(t, function(err, o) {
+ if (err) {
+ return t.end(err);
+ }
+
+ var ctx = o.ctx;
+ var db = o.database;
+ var table = o.table;
+
+ var row1Prefix = 'row-abc';
+ var row1 = table.row(uniqueName(row1Prefix));
+ var value1 = uniqueName('value');
+
+ var row2Prefix = 'row-a';
+ var row2 = table.row(uniqueName(row2Prefix));
+ var value2 = uniqueName('value');
+
+ var resumeMarkers = [];
+
+ function getAndAppendResumeMarker(cb) {
+ db.getResumeMarker(ctx, function(err, rm) {
+ if (err) {
+ return cb(err);
+ }
+ resumeMarkers.push(rm);
+ cb(null);
+ });
+ }
+
+ // Generate the data and resume markers.
+ async.waterfall([
+ // Initial state.
+ getAndAppendResumeMarker,
+
+ // Put to row1.
+ row1.put.bind(row1, ctx, value1),
+ getAndAppendResumeMarker,
+
+ // Delete row1.
+ row1.delete.bind(row1, ctx),
+ getAndAppendResumeMarker,
+
+ // Put to row2.
+ row2.put.bind(row2, ctx, value2),
+ getAndAppendResumeMarker
+ ], assertCorrectChanges);
+
+ function assertCorrectChanges(err) {
+ if (err) {
+ t.error(err);
+ return o.teardown(t.end);
+ }
+
+ var allExpectedChanges = [new WatchChange({
+ tableName: table.name,
+ rowName: row1.key,
+ changeType: 'put',
+ valueBytes: vom.encode(value1),
+ resumeMarker: resumeMarkers[1]
+ }), new WatchChange({
+ tableName: table.name,
+ rowName: row1.key,
+ changeType: 'delete',
+ valueBytes: null,
+ resumeMarker: resumeMarkers[2]
+ }), new WatchChange({
+ tableName: table.name,
+ rowName: row2.key,
+ changeType: 'put',
+ valueBytes: vom.encode(value2),
+ resumeMarker: resumeMarkers[3]
+ })];
+
+ async.series([
+ assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+ resumeMarkers[0], allExpectedChanges),
+ assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+ resumeMarkers[1], allExpectedChanges.slice(1)),
+ assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+ resumeMarkers[2], allExpectedChanges.slice(2)),
+
+ assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
+ resumeMarkers[0], allExpectedChanges.slice(0,2)),
+ assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
+ resumeMarkers[1], allExpectedChanges.slice(1,2)),
+ ], function(err) {
+ t.error(err);
+ o.teardown(t.end);
+ });
+ }
+ });
+});
+
+function assertWatch(t, ctx, db, tableName, rowPrefix, resumeMarker,
+ expectedWatchChanges, cb) {
+ var cctx = ctx.withCancel();
+ var stream = db.watch(ctx, tableName, rowPrefix, resumeMarker);
+
+ async.timesSeries(expectedWatchChanges.length, function(i, next) {
+ stream.once('data', function(gotWatchChange) {
+ t.deepEqual(gotWatchChange, expectedWatchChanges[i]);
+
+ next(null);
+ });
+ }, function(err) {
+ cctx.finish();
+ if (err) {
+ return cb(err);
+ }
+ cb(null);
+ });
+}