syncbase/watch: Return initial state for empty ResumeMarker.
Empty ResumeMarker is a convenience for Scan followed by Watch,
returning the initial state of the watched row set as the first
change batch, as specified in v.io/v23/watch.
Fixes v.io/i/693 and most of v.io/i/917
MultiPart: 5/5
Change-Id: If001b8e0b4acfc21bcbcc1ab54ad7317d699292c
diff --git a/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
index 91dc09c..ca80fa4 100644
--- a/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
@@ -328,7 +328,7 @@
DatabaseWatcher.prototype._serviceDescription = {
name: 'DatabaseWatcher',
pkgPath: 'v.io/v23/services/syncbase/nosql',
- doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the Name\n// field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all data your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching for changes to the data using the ResumeMarker\n// In this configuration the client will not miss any changes to the data.",
+ doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// Watching is done by starting a streaming RPC. The RPC takes a ResumeMarker\n// argument that points to a particular place in the database event log. If an\n// empty ResumeMarker is provided, the WatchStream will begin with a Change\n// batch containing the initial state. Otherwise, the WatchStream will contain\n// only changes since the provided ResumeMarker.\n//\n// The result stream consists of a never-ending sequence of Change messages\n// (until the call fails or is canceled). Each Change contains the Name field\n// in the form \"<tableName>/<rowKey>\" and the Value field of the StoreChange\n// type. If the client has no access to a row specified in a change, that change\n// is excluded from the result stream.\n//\n// Note: A single Watch Change batch may contain changes from more than one\n// batch as originally committed on a remote Syncbase or obtained from conflict\n// resolution. However, changes from a single original batch will always appear\n// in the same Change batch.",
embeds: [{
name: 'GlobWatcher',
pkgPath: 'v.io/v23/services/watch',
@@ -1042,6 +1042,16 @@
};
+Database.prototype.pauseSync = function(ctx, serverCall) {
+ throw new Error('Method PauseSync not implemented');
+};
+
+
+Database.prototype.resumeSync = function(ctx, serverCall) {
+ throw new Error('Method ResumeSync not implemented');
+};
+
+
Database.prototype.setPermissions = function(ctx, serverCall, perms, version) {
throw new Error('Method SetPermissions not implemented');
};
@@ -1185,7 +1195,7 @@
{
name: 'DatabaseWatcher',
pkgPath: 'v.io/v23/services/syncbase/nosql',
- doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the Name\n// field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all data your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching for changes to the data using the ResumeMarker\n// In this configuration the client will not miss any changes to the data."
+ doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// Watching is done by starting a streaming RPC. The RPC takes a ResumeMarker\n// argument that points to a particular place in the database event log. If an\n// empty ResumeMarker is provided, the WatchStream will begin with a Change\n// batch containing the initial state. Otherwise, the WatchStream will contain\n// only changes since the provided ResumeMarker.\n//\n// The result stream consists of a never-ending sequence of Change messages\n// (until the call fails or is canceled). Each Change contains the Name field\n// in the form \"<tableName>/<rowKey>\" and the Value field of the StoreChange\n// type. If the client has no access to a row specified in a change, that change\n// is excluded from the result stream.\n//\n// Note: A single Watch Change batch may contain changes from more than one\n// batch as originally committed on a remote Syncbase or obtained from conflict\n// resolution. However, changes from a single original batch will always appear\n// in the same Change batch."
},
{
name: 'SyncgroupManager',
@@ -1369,6 +1379,28 @@
{
+ name: 'PauseSync',
+ doc: "// PauseSync pauses sync for this database. Incoming sync, as well as\n// outgoing sync of subsequent writes, will be disabled until ResumeSync\n// is called. PauseSync is idempotent.",
+ inArgs: [],
+ outArgs: [],
+ inStream: null,
+ outStream: null,
+ tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
+ },
+
+
+ {
+ name: 'ResumeSync',
+ doc: "// ResumeSync resumes sync for this database. ResumeSync is idempotent.",
+ inArgs: [],
+ outArgs: [],
+ inStream: null,
+ outStream: null,
+ tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
+ },
+
+
+ {
name: 'SetPermissions',
doc: "// SetPermissions replaces the current Permissions for an object. version\n// allows for optional, optimistic concurrency control. If non-empty,\n// version's value must come from GetPermissions. If any client has\n// successfully called SetPermissions in the meantime, the version will be\n// stale and SetPermissions will fail. If empty, SetPermissions performs an\n// unconditional update.\n//\n// Permissions objects are expected to be small. It is up to the\n// implementation to define the exact limit, though it should probably be\n// around 100KB. Large lists of principals can be represented concisely using\n// blessings.\n//\n// There is some ambiguity when calling SetPermissions on a mount point.\n// Does it affect the mount itself or does it affect the service endpoint\n// that the mount points to? The chosen behavior is that it affects the\n// service endpoint. To modify the mount point's Permissions, use\n// ResolveToMountTable to get an endpoint and call SetPermissions on that.\n// This means that clients must know when a name refers to a mount point to\n// change its Permissions.",
inArgs: [{
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 8558f12..6608b19 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -103,25 +103,26 @@
* Watches for updates to the database. For each watch request, the client will
* receive a reliable stream of watch events without re-ordering.
*
- * This method is designed to be used in the following way:
- * 1) begin a read-only batch
- * 2) read all information your app needs
- * 3) read the ResumeMarker
- * 4) abort the batch
- * 5) start watching for changes to the data using the ResumeMarker
- *
- * In this configuration the client doesn't miss any changes.
+ * If the ResumeMarker is not provided, the stream will begin with a change
+ * batch containing the initial state. Otherwise, the stream will contain only
+ * changes since the provided ResumeMarker.
*
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {string} table Name of table to watch.
* @param {string} prefix Prefix of keys to watch.
- * @param {module:syncbase.nosql.watch.ResumeMarker} resumeMarker ResumeMarker
- * to resume watching from.
+ * @param {module:syncbase.nosql.watch.ResumeMarker} [resumeMarker] ResumeMarker
+ * to resume watching from. If not provided, watch stream begins with a batch
+ * containing the initial state.
* @param {function} [cb] Optional callback that will be called after watch RPC
* finishes.
* @returns {stream} Stream of WatchChange objects.
*/
Database.prototype.watch = function(ctx, tableName, prefix, resumeMarker, cb) {
+ if (typeof cb === 'undefined' && typeof resumeMarker === 'function') {
+ cb = resumeMarker;
+ resumeMarker = undefined;
+ }
+
var globReq = new watchVdl.GlobRequest({
pattern: vanadium.naming.join(tableName, prefix + '*'),
resumeMarker: resumeMarker
diff --git a/test/integration/test-watch.js b/test/integration/test-watch.js
index fca1cec..d95f00c 100644
--- a/test/integration/test-watch.js
+++ b/test/integration/test-watch.js
@@ -99,6 +99,10 @@
assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
resumeMarkers[2], allExpectedChanges.slice(2)),
+ // Undefined resume marker - include initial state.
+ assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+ undefined, allExpectedChanges.slice(2)),
+
assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
resumeMarkers[0], allExpectedChanges.slice(0,2)),
assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
@@ -114,7 +118,9 @@
function assertWatch(t, ctx, db, tableName, rowPrefix, resumeMarker,
expectedWatchChanges, cb) {
var cctx = ctx.withCancel();
- var stream = db.watch(ctx, tableName, rowPrefix, resumeMarker);
+ var stream = resumeMarker === undefined ?
+ db.watch(ctx, tableName, rowPrefix) :
+ db.watch(ctx, tableName, rowPrefix, resumeMarker);
async.timesSeries(expectedWatchChanges.length, function(i, next) {
stream.once('data', function(gotWatchChange) {