syncbase/watch: Return initial state for empty ResumeMarker.

Empty ResumeMarker is a convenience for Scan followed by Watch,
returning the initial state of the watched row set as the first
change batch, as specified in v.io/v23/watch.

Fixes v.io/i/693 and most of v.io/i/917

MultiPart: 5/5
Change-Id: If001b8e0b4acfc21bcbcc1ab54ad7317d699292c
diff --git a/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
index 91dc09c..ca80fa4 100644
--- a/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/v23/services/syncbase/nosql/index.js
@@ -328,7 +328,7 @@
 DatabaseWatcher.prototype._serviceDescription = {
   name: 'DatabaseWatcher',
   pkgPath: 'v.io/v23/services/syncbase/nosql',
-  doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the Name\n// field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all data your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching for changes to the data using the ResumeMarker\n// In this configuration the client will not miss any changes to the data.",
+  doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// Watching is done by starting a streaming RPC. The RPC takes a ResumeMarker\n// argument that points to a particular place in the database event log. If an\n// empty ResumeMarker is provided, the WatchStream will begin with a Change\n// batch containing the initial state. Otherwise, the WatchStream will contain\n// only changes since the provided ResumeMarker.\n//\n// The result stream consists of a never-ending sequence of Change messages\n// (until the call fails or is canceled). Each Change contains the Name field\n// in the form \"<tableName>/<rowKey>\" and the Value field of the StoreChange\n// type. If the client has no access to a row specified in a change, that change\n// is excluded from the result stream.\n//\n// Note: A single Watch Change batch may contain changes from more than one\n// batch as originally committed on a remote Syncbase or obtained from conflict\n// resolution. However, changes from a single original batch will always appear\n// in the same Change batch.",
   embeds: [{
       name: 'GlobWatcher',
       pkgPath: 'v.io/v23/services/watch',
@@ -1042,6 +1042,16 @@
 };
     
       
+Database.prototype.pauseSync = function(ctx, serverCall) {
+  throw new Error('Method PauseSync not implemented');
+};
+    
+      
+Database.prototype.resumeSync = function(ctx, serverCall) {
+  throw new Error('Method ResumeSync not implemented');
+};
+    
+      
 Database.prototype.setPermissions = function(ctx, serverCall, perms, version) {
   throw new Error('Method SetPermissions not implemented');
 };
@@ -1185,7 +1195,7 @@
     {
       name: 'DatabaseWatcher',
       pkgPath: 'v.io/v23/services/syncbase/nosql',
-      doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the Name\n// field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all data your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching for changes to the data using the ResumeMarker\n// In this configuration the client will not miss any changes to the data."
+      doc: "// DatabaseWatcher allows a client to watch for updates to the database. For\n// each watch request, the client will receive a reliable stream of watch events\n// without re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// \"<tableName>/<rowPrefix>*\". Consider changing that.\n//\n// Watching is done by starting a streaming RPC. The RPC takes a ResumeMarker\n// argument that points to a particular place in the database event log. If an\n// empty ResumeMarker is provided, the WatchStream will begin with a Change\n// batch containing the initial state. Otherwise, the WatchStream will contain\n// only changes since the provided ResumeMarker.\n//\n// The result stream consists of a never-ending sequence of Change messages\n// (until the call fails or is canceled). Each Change contains the Name field\n// in the form \"<tableName>/<rowKey>\" and the Value field of the StoreChange\n// type. If the client has no access to a row specified in a change, that change\n// is excluded from the result stream.\n//\n// Note: A single Watch Change batch may contain changes from more than one\n// batch as originally committed on a remote Syncbase or obtained from conflict\n// resolution. However, changes from a single original batch will always appear\n// in the same Change batch."
     },
     {
       name: 'SyncgroupManager',
@@ -1369,6 +1379,28 @@
     
       
     {
+    name: 'PauseSync',
+    doc: "// PauseSync pauses sync for this database. Incoming sync, as well as\n// outgoing sync of subsequent writes, will be disabled until ResumeSync\n// is called. PauseSync is idempotent.",
+    inArgs: [],
+    outArgs: [],
+    inStream: null,
+    outStream: null,
+    tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
+    name: 'ResumeSync',
+    doc: "// ResumeSync resumes sync for this database. ResumeSync is idempotent.",
+    inArgs: [],
+    outArgs: [],
+    inStream: null,
+    outStream: null,
+    tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
     name: 'SetPermissions',
     doc: "// SetPermissions replaces the current Permissions for an object.  version\n// allows for optional, optimistic concurrency control.  If non-empty,\n// version's value must come from GetPermissions.  If any client has\n// successfully called SetPermissions in the meantime, the version will be\n// stale and SetPermissions will fail.  If empty, SetPermissions performs an\n// unconditional update.\n//\n// Permissions objects are expected to be small.  It is up to the\n// implementation to define the exact limit, though it should probably be\n// around 100KB.  Large lists of principals can be represented concisely using\n// blessings.\n//\n// There is some ambiguity when calling SetPermissions on a mount point.\n// Does it affect the mount itself or does it affect the service endpoint\n// that the mount points to?  The chosen behavior is that it affects the\n// service endpoint.  To modify the mount point's Permissions, use\n// ResolveToMountTable to get an endpoint and call SetPermissions on that.\n// This means that clients must know when a name refers to a mount point to\n// change its Permissions.",
     inArgs: [{
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 8558f12..6608b19 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -103,25 +103,26 @@
  * Watches for updates to the database. For each watch request, the client will
  * receive a reliable stream of watch events without re-ordering.
  *
- * This method is designed to be used in the following way:
- * 1) begin a read-only batch
- * 2) read all information your app needs
- * 3) read the ResumeMarker
- * 4) abort the batch
- * 5) start watching for changes to the data using the ResumeMarker
- *
- * In this configuration the client doesn't miss any changes.
+ * If the ResumeMarker is not provided, the stream will begin with a change
+ * batch containing the initial state. Otherwise, the stream will contain only
+ * changes since the provided ResumeMarker.
  *
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {string} table Name of table to watch.
  * @param {string} prefix Prefix of keys to watch.
- * @param {module:syncbase.nosql.watch.ResumeMarker} resumeMarker ResumeMarker
- * to resume watching from.
+ * @param {module:syncbase.nosql.watch.ResumeMarker} [resumeMarker] ResumeMarker
+ * to resume watching from. If not provided, watch stream begins with a batch
+ * containing the initial state.
  * @param {function} [cb] Optional callback that will be called after watch RPC
  * finishes.
  * @returns {stream} Stream of WatchChange objects.
  */
 Database.prototype.watch = function(ctx, tableName, prefix, resumeMarker, cb) {
+  if (typeof cb === 'undefined' && typeof resumeMarker === 'function') {
+    cb = resumeMarker;
+    resumeMarker = undefined;
+  }
+
   var globReq = new watchVdl.GlobRequest({
     pattern: vanadium.naming.join(tableName, prefix + '*'),
     resumeMarker: resumeMarker
diff --git a/test/integration/test-watch.js b/test/integration/test-watch.js
index fca1cec..d95f00c 100644
--- a/test/integration/test-watch.js
+++ b/test/integration/test-watch.js
@@ -99,6 +99,10 @@
         assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
                          resumeMarkers[2], allExpectedChanges.slice(2)),
 
+        // Undefined resume marker - include initial state.
+        assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+                         undefined, allExpectedChanges.slice(2)),
+
         assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
                          resumeMarkers[0], allExpectedChanges.slice(0,2)),
         assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
@@ -114,7 +118,9 @@
 function assertWatch(t, ctx, db, tableName, rowPrefix, resumeMarker,
                      expectedWatchChanges, cb) {
   var cctx = ctx.withCancel();
-  var stream = db.watch(ctx, tableName, rowPrefix, resumeMarker);
+  var stream = resumeMarker === undefined ?
+    db.watch(ctx, tableName, rowPrefix) :
+    db.watch(ctx, tableName, rowPrefix, resumeMarker);
 
   async.timesSeries(expectedWatchChanges.length, function(i, next) {
     stream.once('data', function(gotWatchChange) {