projects/todos: Using the syncbase's `watch` API instead of a
polling-based watch mechanism.

Change-Id: Ida2c0d7f7fe9fc9b88e75f2c3f298b887ed9e1d9
diff --git a/browser/index.js b/browser/index.js
index 692c3d2..0984e13 100644
--- a/browser/index.js
+++ b/browser/index.js
@@ -698,6 +698,13 @@
       });
     });
 
+    disp.on('watchError', function(err) {
+      alertOnError(new Error(
+        'Error occurred in Syncbase watch mechanism. Data may be stale. ' +
+        'Please refresh the page.\n' + err)
+      );
+    });
+
     // Load initial lists and todos. Note that changes can come in concurrently
     // via sync.
     this.updateLists_(function(err) {
diff --git a/browser/syncbase_dispatcher.js b/browser/syncbase_dispatcher.js
index 3b18dc4..ee4a60b 100644
--- a/browser/syncbase_dispatcher.js
+++ b/browser/syncbase_dispatcher.js
@@ -38,17 +38,34 @@
 module.exports = SyncbaseDispatcher;
 
 function SyncbaseDispatcher(rt, db) {
+  var that = this;
   Dispatcher.call(this);
   this.rt_ = rt;
   this.ctx_ = rt.getContext();
   this.db_ = db;
   this.tb_ = db.table('tb');
 
-  // Start the watch loop to periodically poll for changes from sync.
-  // TODO(sadovsky): Remove this once we have client watch.
-  var that = this;
-  process.nextTick(function() {
-    that.watchLoop_();
+  // Initialize Syncbase watch
+  var errorCb = function(err) {
+    if (err) {
+      this.emit('watchError', err);
+    }
+  }
+  this.db_.getResumeMarker(that.ctx_, function(err, marker) {
+    if (err) { cb(err); }
+    var watchStream = that.db_.watch(that.ctx_, that.tb_.name, '', marker, errorCb);
+
+    // TODO(aghassemi): Ideally the app would update its in-memory data
+    // structures directly from the watch stream, but since SyncGroup changes
+    // do not yet show up on the watch stream, it can't.
+    watchStream.on('data', function(change) {
+      // TODO(aghassemi): fromSync should be change.fromSync but due to
+      // issue v.io/i/685 we set it to true for now.
+      var fromSync = true;
+      that.emit('change', fromSync);
+    });
+
+    watchStream.on('error', errorCb);
   });
 }
 
@@ -209,10 +226,10 @@
   console.assert(!list._id);
   var listId = newListKey();
   var v = marshal(list);
-  this.tb_.put(ctx, listId, v, this.maybeEmit_(function(err) {
+  this.tb_.put(ctx, listId, v, function(err) {
     if (err) return cb(err);
     return cb(null, listId);
-  }, listId));
+  });
 });
 
 define('editListName', function(ctx, listId, name, cb) {
@@ -242,12 +259,12 @@
         that.addTagImpl_(ctx, tb, todoId, tag, cb);
       }, cb);
     });
-  }, this.maybeEmit_(cb, todoId));
+  }, cb);
 });
 
 define('removeTodo', function(ctx, todoId, cb) {
   // TODO(ivanpi): Also delete corresponding tags.
-  this.tb_.row(todoId).delete(ctx, this.maybeEmit_(cb, todoId));
+  this.tb_.row(todoId).delete(ctx, cb);
 });
 
 define('editTodoText', function(ctx, todoId, text, cb) {
@@ -263,14 +280,14 @@
 });
 
 define('addTag', function(ctx, todoId, tag, cb) {
-  this.addTagImpl_(ctx, this.tb_, todoId, tag, this.maybeEmit_(cb, todoId));
+  this.addTagImpl_(ctx, this.tb_, todoId, tag, cb);
 });
 
 define('removeTag', function(ctx, todoId, tag, cb) {
   // NOTE: Table.delete is awkward (it takes a range), so instead we use
   // Row.delete. It would be nice for Table.delete to operate on a single row
   // and have a separate Table.deleteRowRange.
-  this.tb_.row(tagKey(todoId, tag)).delete(ctx, this.maybeEmit_(cb, todoId));
+  this.tb_.row(tagKey(todoId, tag)).delete(ctx, cb);
 });
 
 ////////////////////////////////////////
@@ -378,104 +395,17 @@
 };
 
 ////////////////////////////////////////
-// Polling-based watch
-
-// Random number, used to implement watch. Each client writes to their own watch
-// key to signify that they've written new data, and each client periodically
-// polls all watch keys to see if anything has changed.
-var clientId = util.uid();
-function watchPrefix(listId) {
-  return join(listId, 'watch');
-}
-function watchKey(listId) {
-  return join(watchPrefix(listId), clientId);
-}
-
-var seq = 0;  // for our own writes
-var prevVersions = null;  // map of list id to version string
-
-// Increments stored seq for this client.
-define('bumpSeq_', function(ctx, listId, cb) {
-  seq++;
-  this.tb_.put(ctx, watchKey(listId), seq, cb);
-});
-
-// Returns a watch seq map for the given list id.
-define('getWatchSeqMap_', function(ctx, listId, cb) {
-  this.getRows_(ctx, watchPrefix(listId), function(err, rows) {
-    if (err) return cb(err);
-    var seqMap = {};  // map of client id to seq
-    _.forEach(rows, function(row) {
-      var parts = row.key.split(SEP);
-      console.assert(parts.length === 3);  // <listId>/watch/<clientId>
-      seqMap[parts[2]] = row.value;
-    });
-    cb(null, seqMap);
-  });
-});
-
-// Returns true if any data has arrived via sync, false if not.
-define('checkForChanges_', function(ctx, cb) {
-  var that = this;
-  this.getListsOnly_(ctx, function(err, lists) {
-    if (err) return cb(err);
-    // Build a map of list id to current version.
-    var currVersions = {};
-    var listIds = _.pluck(lists, '_id');
-    async.each(listIds, function(listId, cb) {
-      that.getWatchSeqMap_(ctx, listId, function(err, seqMap) {
-        if (err) return cb(err);
-        // Remove self from seqMap, join the rest into a version string.
-        delete seqMap[clientId];
-        var strs = _.map(seqMap, function(v, k) {
-          return k + ':' + v;
-        });
-        currVersions[listId] = strs.sort().join(',');
-        cb();
-      });
-    }, function(err) {
-      if (err) return cb(err);
-      // Note that _.isEqual performs a deep comparison.
-      var changed = prevVersions && !_.isEqual(currVersions, prevVersions);
-      prevVersions = currVersions;
-      cb(null, changed);
-    });
-  });
-});
-
-// Runs checkForChanges_ periodically and emits a 'change' event whenever a
-// change (from remote peer) is detected.
-SyncbaseDispatcher.prototype.watchLoop_ = function() {
-  var that = this;
-  var ctx = wt(this.ctx_).withValue(SILENT, 1);
-  this.checkForChanges_(ctx, function(err, changed) {
-    if (err) {
-      console.log('checkForChanges_ failed: ' + err);
-    } else if (changed) {
-      console.log('checkForChanges_ detected a change');
-      that.emit('change', true);
-    }
-    window.setTimeout(that.watchLoop_.bind(that), 500);
-  });
-};
-
-////////////////////////////////////////
 // Internal helpers
 
-// TODO(sadovsky): Drop this method once we have client watch.
-SyncbaseDispatcher.prototype.maybeEmit_ = function(cb, key) {
+// TODO(aghassemi): Remove this once changes to SyncGroups are included
+// in the watch stream.
+SyncbaseDispatcher.prototype.maybeEmit_ = function(cb) {
   var that = this;
   cb = cb || noop;
   return function(err) {
     cb.apply(null, arguments);
     if (err) return;
     that.emit('change');
-    if (key) {
-      // Run bumpSeq_ in the background.
-      that.bumpSeq_(keyToListId(key), function(err) {
-        if (err) console.log('bumpSeq_ failed: ' + err);
-      });
-    }
   };
 };
 
@@ -545,5 +475,5 @@
       var newValue = marshal(updateFn(unmarshal(value)));
       tb.put(wn(ctx, 'put:' + key), key, newValue, cb);
     });
-  }, this.maybeEmit_(cb, key));
+  }, cb);
 });