Implement client watch.

API modeled after the Go api in go/vcl/14421

Change-Id: I658e515876f6049e865b26b353be33b3628bf6b3
diff --git a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
index 1e5f77d..168e415 100644
--- a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
@@ -29,11 +29,11 @@
 var _type6 = new vdl.Type();
 var _type7 = new vdl.Type();
 var _typeBatchOptions = new vdl.Type();
+var _typeBlobFetchState = new vdl.Type();
+var _typeBlobFetchStatus = new vdl.Type();
 var _typeBlobRef = new vdl.Type();
 var _typeCrPolicy = new vdl.Type();
 var _typeCrRule = new vdl.Type();
-var _typeFetchState = new vdl.Type();
-var _typeFetchStatus = new vdl.Type();
 var _typeKeyValue = new vdl.Type();
 var _typePrefixPermissions = new vdl.Type();
 var _typeResolverType = new vdl.Type();
@@ -66,6 +66,12 @@
 _typeBatchOptions.kind = vdl.kind.STRUCT;
 _typeBatchOptions.name = "v.io/syncbase/v23/services/syncbase/nosql.BatchOptions";
 _typeBatchOptions.fields = [{name: "Hint", type: vdl.types.STRING}, {name: "ReadOnly", type: vdl.types.BOOL}];
+_typeBlobFetchState.kind = vdl.kind.ENUM;
+_typeBlobFetchState.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobFetchState";
+_typeBlobFetchState.labels = ["Pending", "Locating", "Fetching", "Done"];
+_typeBlobFetchStatus.kind = vdl.kind.STRUCT;
+_typeBlobFetchStatus.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobFetchStatus";
+_typeBlobFetchStatus.fields = [{name: "State", type: _typeBlobFetchState}, {name: "Received", type: vdl.types.INT64}, {name: "Total", type: vdl.types.INT64}];
 _typeBlobRef.kind = vdl.kind.STRING;
 _typeBlobRef.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobRef";
 _typeCrPolicy.kind = vdl.kind.STRUCT;
@@ -74,12 +80,6 @@
 _typeCrRule.kind = vdl.kind.STRUCT;
 _typeCrRule.name = "v.io/syncbase/v23/services/syncbase/nosql.CrRule";
 _typeCrRule.fields = [{name: "TableName", type: vdl.types.STRING}, {name: "KeyPrefix", type: vdl.types.STRING}, {name: "Type", type: vdl.types.STRING}, {name: "Resolver", type: _typeResolverType}];
-_typeFetchState.kind = vdl.kind.ENUM;
-_typeFetchState.name = "v.io/syncbase/v23/services/syncbase/nosql.FetchState";
-_typeFetchState.labels = ["Pending", "Locating", "Fetching", "Done"];
-_typeFetchStatus.kind = vdl.kind.STRUCT;
-_typeFetchStatus.name = "v.io/syncbase/v23/services/syncbase/nosql.FetchStatus";
-_typeFetchStatus.fields = [{name: "State", type: _typeFetchState}, {name: "Received", type: vdl.types.UINT64}, {name: "Total", type: vdl.types.UINT64}];
 _typeKeyValue.kind = vdl.kind.STRUCT;
 _typeKeyValue.name = "v.io/syncbase/v23/services/syncbase/nosql.KeyValue";
 _typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type3}];
@@ -109,11 +109,11 @@
 _type6.freeze();
 _type7.freeze();
 _typeBatchOptions.freeze();
+_typeBlobFetchState.freeze();
+_typeBlobFetchStatus.freeze();
 _typeBlobRef.freeze();
 _typeCrPolicy.freeze();
 _typeCrRule.freeze();
-_typeFetchState.freeze();
-_typeFetchStatus.freeze();
 _typeKeyValue.freeze();
 _typePrefixPermissions.freeze();
 _typeResolverType.freeze();
@@ -122,16 +122,16 @@
 _typeSyncGroupMemberInfo.freeze();
 _typeSyncGroupSpec.freeze();
 module.exports.BatchOptions = (vdl.registry.lookupOrCreateConstructor(_typeBatchOptions));
+module.exports.BlobFetchState = {
+  PENDING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Pending', true), _typeBlobFetchState),
+  LOCATING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Locating', true), _typeBlobFetchState),
+  FETCHING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Fetching', true), _typeBlobFetchState),
+  DONE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchState))('Done', true), _typeBlobFetchState),
+};
+module.exports.BlobFetchStatus = (vdl.registry.lookupOrCreateConstructor(_typeBlobFetchStatus));
 module.exports.BlobRef = (vdl.registry.lookupOrCreateConstructor(_typeBlobRef));
 module.exports.CrPolicy = (vdl.registry.lookupOrCreateConstructor(_typeCrPolicy));
 module.exports.CrRule = (vdl.registry.lookupOrCreateConstructor(_typeCrRule));
-module.exports.FetchState = {
-  PENDING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Pending', true), _typeFetchState),
-  LOCATING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Locating', true), _typeFetchState),
-  FETCHING: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Fetching', true), _typeFetchState),
-  DONE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeFetchState))('Done', true), _typeFetchState),
-};
-module.exports.FetchStatus = (vdl.registry.lookupOrCreateConstructor(_typeFetchStatus));
 module.exports.KeyValue = (vdl.registry.lookupOrCreateConstructor(_typeKeyValue));
 module.exports.PrefixPermissions = (vdl.registry.lookupOrCreateConstructor(_typePrefixPermissions));
 module.exports.ResolverType = {
@@ -185,6 +185,12 @@
 ]);
 
 
+module.exports.BlobNotCommittedError = makeError('v.io/syncbase/v23/services/syncbase/nosql.BlobNotCommitted', actions.NO_RETRY, {
+  'en': '{1:}{2:} blob is not yet committed',
+}, [
+]);
+
+
 
 
 // Services:
@@ -210,7 +216,7 @@
 DatabaseWatcher.prototype._serviceDescription = {
   name: 'DatabaseWatcher',
   pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-  doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+  doc: "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
   embeds: [{
       name: 'GlobWatcher',
       pkgPath: 'v.io/v23/services/watch',
@@ -714,7 +720,7 @@
     outStream: {
       name: '',
       doc: '',
-      type: _typeFetchStatus
+      type: _typeBlobFetchStatus
     },
     tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
   },
@@ -857,6 +863,11 @@
 };
     
       
+Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
+  throw new Error('Method Exec not implemented');
+};
+    
+      
 Database.prototype.beginBatch = function(ctx, serverCall, schemaVersion, bo) {
   throw new Error('Method BeginBatch not implemented');
 };
@@ -867,11 +878,6 @@
 };
     
       
-Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
-  throw new Error('Method Exec not implemented');
-};
-    
-      
 Database.prototype.abort = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Abort not implemented');
 };
@@ -1015,7 +1021,7 @@
     {
       name: 'DatabaseWatcher',
       pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-      doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
+      doc: "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
     },
     {
       name: 'SyncGroupManager',
@@ -1095,6 +1101,31 @@
     
       
     {
+    name: 'Exec',
+    doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
+      name: 'query',
+      doc: "",
+      type: vdl.types.STRING
+    },
+    ],
+    outArgs: [],
+    inStream: null,
+    outStream: {
+      name: '',
+      doc: '',
+      type: _type6
+    },
+    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
     name: 'BeginBatch',
     doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
     inArgs: [{
@@ -1137,31 +1168,6 @@
     
       
     {
-    name: 'Exec',
-    doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
-    inArgs: [{
-      name: 'schemaVersion',
-      doc: "",
-      type: vdl.types.INT32
-    },
-    {
-      name: 'query',
-      doc: "",
-      type: vdl.types.STRING
-    },
-    ],
-    outArgs: [],
-    inStream: null,
-    outStream: {
-      name: '',
-      doc: '',
-      type: _type6
-    },
-    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
-  },
-    
-      
-    {
     name: 'Abort',
     doc: "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
     inArgs: [{
@@ -1582,7 +1588,7 @@
     outStream: {
       name: '',
       doc: '',
-      type: _typeFetchStatus
+      type: _typeBlobFetchStatus
     },
     tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
   },
@@ -1791,7 +1797,7 @@
       
     {
     name: 'DeleteRowRange',
-    doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
+    doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.\n// TODO(sadovsky): Delete prefix perms fully covered by the row range?",
     inArgs: [{
       name: 'schemaVersion',
       doc: "",
diff --git a/src/gen-vdl/v.io/v23/security/index.js b/src/gen-vdl/v.io/v23/security/index.js
index 5221584..de0e2eb 100644
--- a/src/gen-vdl/v.io/v23/security/index.js
+++ b/src/gen-vdl/v.io/v23/security/index.js
@@ -273,17 +273,17 @@
 
   module.exports.SHA512Hash = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeHash))("SHA512", true), _typeHash);
 
-  module.exports.SignatureForMessageSigning = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S", true), vdl.types.STRING);
+  module.exports.SignatureForMessageSigning = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
 
-  module.exports.SignatureForBlessingCertificates = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B", true), vdl.types.STRING);
+  module.exports.SignatureForBlessingCertificates = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
 
-  module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
+  module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
 
-  module.exports.SignatureForMessageSigningV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
+  module.exports.SignatureForMessageSigningV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S", true), vdl.types.STRING);
 
-  module.exports.SignatureForBlessingCertificatesV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
+  module.exports.SignatureForBlessingCertificatesV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B", true), vdl.types.STRING);
 
-  module.exports.SignatureForDischargeV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
+  module.exports.SignatureForDischargeV0 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
 
 
 
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 58c0993..febf66f 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -10,11 +10,14 @@
 // vanadium.vdl object.
 var unwrap = require('vanadium/src/vdl/type-util').unwrap;
 
-var BatchDatabase = require('./batch-database');
 var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
+
+var BatchDatabase = require('./batch-database');
 var SyncGroup = require('./syncgroup');
 var Table = require('./table');
 var util = require('../util');
+var watch = require('./watch');
 
 /**
  * Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -214,6 +217,80 @@
 };
 
 /**
+ * Watches for updates to the database. For each watch request, the client will
+ * receive a reliable stream of watch events without re-ordering.
+ *
+ * This method is designed to be used in the following way:
+ * 1) begin a read-only batch
+ * 2) read all information your app needs
+ * 3) read the ResumeMarker
+ * 4) abort the batch
+ * 5) start watching for changes to the data using the ResumeMarker
+ *
+ * In this configuration the client doesn't miss any changes.
+ *
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {string} table Name of table to watch.
+ * @param {string} prefix Prefix of keys to watch.
+ * @param {module:syncbase.nosql.watch.ResumeMarker} resumeMarker ResumeMarker
+ * to resume watching from.
+ * @param {function} [cb] Optional callback that will be called after watch RPC
+ * finishes.
+ * @returns {stream} Stream of WatchChange objects.
+ */
+Database.prototype.watch = function(ctx, tableName, prefix, resumeMarker, cb) {
+  var globReq = new watchVdl.GlobRequest({
+    pattern: vanadium.naming.join(tableName, prefix + '*'),
+    resumeMarker: resumeMarker
+  });
+
+  var watchChangeEncoder = through2({
+    objectMode: true
+  }, function(change, enc, cb) {
+    var changeType;
+    switch (change.state) {
+      case watchVdl.Exists.val:
+        changeType = 'put';
+        break;
+      case watchVdl.DoesNotExist.val:
+        changeType = 'delete';
+        break;
+      default:
+        return cb(new Error('invalid change state ' + change.state));
+    }
+
+    var wc = new watch.WatchChange({
+      tableName: vanadium.naming.stripBasename(change.name),
+      rowName: vanadium.naming.basename(change.name),
+      changeType: changeType,
+      valueBytes: changeType === 'put' ? change.value.value : null,
+      resumeMarker: change.resumeMarker,
+      fromSync: change.value.fromSync,
+      continued: change.continued
+    });
+    return cb(null, wc);
+  });
+
+  var stream = this._wire(ctx).watchGlob(ctx, globReq, cb).stream;
+
+  var watchChangeStream = stream.pipe(watchChangeEncoder);
+  stream.on('error', function(err) {
+    watchChangeStream.emit('error', err);
+  });
+
+  return watchChangeStream;
+};
+
+/**
+ * Gets the ResumeMarker that points to the current end of the event log.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Database.prototype.getResumeMarker = function(ctx, cb) {
+  this._wire(ctx).getResumeMarker(ctx, cb);
+};
+
+/**
  * Replaces the current Permissions for the Database.
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {module:vanadium.security.access.Permissions} perms Permissions for
diff --git a/src/nosql/index.js b/src/nosql/index.js
index c589058..5bbab69 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -5,6 +5,7 @@
 var rowrange = require('./rowrange');
 var runInBatch = require('./batch');
 var vdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+var watch = require('./watch');
 
 /**
  * @summary
@@ -16,8 +17,10 @@
 module.exports = {
   BatchOptions: vdl.BatchOptions,
   ReadOnlyBatchError: vdl.ReadOnlyBatchError,
+  ResumeMarker: watch.ResumeMarker,
   rowrange: rowrange,
   runInBatch: runInBatch,
   SyncGroupMemberInfo: vdl.SyncGroupMemberInfo,
-  SyncGroupSpec: vdl.SyncGroupSpec
+  SyncGroupSpec: vdl.SyncGroupSpec,
+  WatchChange: watch.WatchChange
 };
diff --git a/src/nosql/watch.js b/src/nosql/watch.js
new file mode 100644
index 0000000..dba87e2
--- /dev/null
+++ b/src/nosql/watch.js
@@ -0,0 +1,107 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var vom = require('vanadium').vom;
+
+var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
+
+module.exports = {
+  ResumeMarker: watchVdl.ResumeMarker,
+  WatchChange: WatchChange
+};
+
+/**
+ * WatchChange represents the new value for a watched entity.
+ * @constructor
+ */
+function WatchChange(opts) {
+  /**
+   * @property tableName
+   * The name of the table that contains the changed row
+   */
+  Object.defineProperty(this, 'tableName', {
+    enumerable: true,
+    value: opts.tableName,
+    writable: false
+  });
+
+  /**
+   * @property rowName
+   * Name of the changed row.
+   */
+  Object.defineProperty(this, 'rowName', {
+    enumerable: true,
+    value: opts.rowName,
+    writable: false
+  });
+
+  /**
+   * @property changeType
+   * Describes the type of the change. If the changeType equals 'put', then the
+   * row exists in the table and the value contains the new value for this row.
+   * If the state equals 'delete', then the row was removed from the table.
+   */
+  Object.defineProperty(this, 'changeType', {
+    enumerable: true,
+    value: opts.changeType,
+    writable: false
+  });
+
+  /**
+   * @property valueBytes
+   * The new VOM-encoded value for the row if the changeType is 'put' or nil
+   * otherwise.
+   */
+  Object.defineProperty(this, 'valueBytes', {
+    enumerable: true,
+    value: opts.valueBytes,
+    writable: false
+  });
+
+  /**
+   * @property resumeMarker
+   * Provides a compact representation of all the messages that have been
+   * received by the caller for the given watch call.  This marker can be
+   * provided in the request message to allow the caller to resume the stream
+   * watching at a specific point without fetching the initial state.
+   */
+  Object.defineProperty(this, 'resumeMarker', {
+    enumerable: true,
+    value: opts.resumeMarker,
+    writable: false
+  });
+
+  /**
+   * @property fromSync
+   * Indicates whether the change came from sync. If fromSync is false, then
+   * the change originated from the local device.
+   */
+  Object.defineProperty(this, 'fromSync', {
+    enumerable: true,
+    value: opts.fromSync || false,
+    writable: false
+  });
+
+  /**
+   * @property continued
+   * If true, this WatchChange is followed by more WatchChanges that are in the
+   * same batch as this WatchChange
+   */
+  Object.defineProperty(this, 'continued', {
+    enumerable: true,
+    value: opts.continued || false,
+    writable: false
+  });
+}
+
+/**
+ * Decodes the new value of the watched element.
+ */
+WatchChange.prototype.getValue = function(cb) {
+  if (this.changeType === 'delete') {
+    return cb(new Error('invalid change type'));
+  }
+
+  vom.decode(this.valueBytes, false, null, cb);
+};
diff --git a/test/integration/test-watch.js b/test/integration/test-watch.js
new file mode 100644
index 0000000..fca1cec
--- /dev/null
+++ b/test/integration/test-watch.js
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var async = require('async');
+var test = require('prova');
+var vom = require('vanadium').vom;
+
+var syncbase = require('../..');
+var WatchChange = syncbase.nosql.WatchChange;
+
+var testUtil = require('./util');
+var setupTable = testUtil.setupTable;
+var uniqueName = testUtil.uniqueName;
+
+// Tests the basic client watch functionality (no perms or batches).  First
+// does some puts and deletes, fetching a ResumeMarker after each operation.
+// Then calls 'watch' with different prefixes and ResumeMarkers and verifies
+// that the resulting stream contains the correct changes.
+test('basic client watch', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    var row1Prefix = 'row-abc';
+    var row1 = table.row(uniqueName(row1Prefix));
+    var value1 = uniqueName('value');
+
+    var row2Prefix = 'row-a';
+    var row2 = table.row(uniqueName(row2Prefix));
+    var value2 = uniqueName('value');
+
+    var resumeMarkers = [];
+
+    function getAndAppendResumeMarker(cb) {
+      db.getResumeMarker(ctx, function(err, rm) {
+        if (err) {
+          return cb(err);
+        }
+        resumeMarkers.push(rm);
+        cb(null);
+      });
+    }
+
+    // Generate the data and resume markers.
+    async.waterfall([
+      // Initial state.
+      getAndAppendResumeMarker,
+
+      // Put to row1.
+      row1.put.bind(row1, ctx, value1),
+      getAndAppendResumeMarker,
+
+      // Delete row1.
+      row1.delete.bind(row1, ctx),
+      getAndAppendResumeMarker,
+
+      // Put to row2.
+      row2.put.bind(row2, ctx, value2),
+      getAndAppendResumeMarker
+    ], assertCorrectChanges);
+
+    function assertCorrectChanges(err) {
+      if (err) {
+        t.error(err);
+        return o.teardown(t.end);
+      }
+
+      var allExpectedChanges = [new WatchChange({
+        tableName: table.name,
+        rowName: row1.key,
+        changeType: 'put',
+        valueBytes: vom.encode(value1),
+        resumeMarker: resumeMarkers[1]
+      }), new WatchChange({
+        tableName: table.name,
+        rowName: row1.key,
+        changeType: 'delete',
+        valueBytes: null,
+        resumeMarker: resumeMarkers[2]
+      }), new WatchChange({
+        tableName: table.name,
+        rowName: row2.key,
+        changeType: 'put',
+        valueBytes: vom.encode(value2),
+        resumeMarker: resumeMarkers[3]
+      })];
+
+      async.series([
+        assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+                         resumeMarkers[0], allExpectedChanges),
+        assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+                         resumeMarkers[1], allExpectedChanges.slice(1)),
+        assertWatch.bind(null, t, ctx, db, table.name, row2Prefix,
+                         resumeMarkers[2], allExpectedChanges.slice(2)),
+
+        assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
+                         resumeMarkers[0], allExpectedChanges.slice(0,2)),
+        assertWatch.bind(null, t, ctx, db, table.name, row1Prefix,
+                         resumeMarkers[1], allExpectedChanges.slice(1,2)),
+      ], function(err) {
+        t.error(err);
+        o.teardown(t.end);
+      });
+    }
+  });
+});
+
+function assertWatch(t, ctx, db, tableName, rowPrefix, resumeMarker,
+                     expectedWatchChanges, cb) {
+  var cctx = ctx.withCancel();
+  var stream = db.watch(ctx, tableName, rowPrefix, resumeMarker);
+
+  async.timesSeries(expectedWatchChanges.length, function(i, next) {
+    stream.once('data', function(gotWatchChange) {
+      t.deepEqual(gotWatchChange, expectedWatchChanges[i]);
+
+      next(null);
+    });
+  }, function(err) {
+    cctx.finish();
+    if (err) {
+      return cb(err);
+    }
+    cb(null);
+  });
+}