projects/todos: Using the syncbase's `watch` API instead of a
polling-based watch mechanism.
Change-Id: Ida2c0d7f7fe9fc9b88e75f2c3f298b887ed9e1d9
diff --git a/browser/index.js b/browser/index.js
index 692c3d2..0984e13 100644
--- a/browser/index.js
+++ b/browser/index.js
@@ -698,6 +698,13 @@
});
});
+ disp.on('watchError', function(err) {
+ alertOnError(new Error(
+ 'Error occurred in Syncbase watch mechanism. Data may be stale. ' +
+ 'Please refresh the page.\n' + err)
+ );
+ });
+
// Load initial lists and todos. Note that changes can come in concurrently
// via sync.
this.updateLists_(function(err) {
diff --git a/browser/syncbase_dispatcher.js b/browser/syncbase_dispatcher.js
index 3b18dc4..ee4a60b 100644
--- a/browser/syncbase_dispatcher.js
+++ b/browser/syncbase_dispatcher.js
@@ -38,17 +38,34 @@
module.exports = SyncbaseDispatcher;
function SyncbaseDispatcher(rt, db) {
+ var that = this;
Dispatcher.call(this);
this.rt_ = rt;
this.ctx_ = rt.getContext();
this.db_ = db;
this.tb_ = db.table('tb');
- // Start the watch loop to periodically poll for changes from sync.
- // TODO(sadovsky): Remove this once we have client watch.
- var that = this;
- process.nextTick(function() {
- that.watchLoop_();
+ // Initialize Syncbase watch
+ var errorCb = function(err) {
+ if (err) {
+ this.emit('watchError', err);
+ }
+ }
+ this.db_.getResumeMarker(that.ctx_, function(err, marker) {
+ if (err) { cb(err); }
+ var watchStream = that.db_.watch(that.ctx_, that.tb_.name, '', marker, errorCb);
+
+ // TODO(aghassemi): Ideally the app would update its in-memory data
+ // structures directly from the watch stream, but since SyncGroup changes
+ // do not yet show up on the watch stream, it can't.
+ watchStream.on('data', function(change) {
+ // TODO(aghassemi): fromSync should be change.fromSync but due to
+ // issue v.io/i/685 we set it to true for now.
+ var fromSync = true;
+ that.emit('change', fromSync);
+ });
+
+ watchStream.on('error', errorCb);
});
}
@@ -209,10 +226,10 @@
console.assert(!list._id);
var listId = newListKey();
var v = marshal(list);
- this.tb_.put(ctx, listId, v, this.maybeEmit_(function(err) {
+ this.tb_.put(ctx, listId, v, function(err) {
if (err) return cb(err);
return cb(null, listId);
- }, listId));
+ });
});
define('editListName', function(ctx, listId, name, cb) {
@@ -242,12 +259,12 @@
that.addTagImpl_(ctx, tb, todoId, tag, cb);
}, cb);
});
- }, this.maybeEmit_(cb, todoId));
+ }, cb);
});
define('removeTodo', function(ctx, todoId, cb) {
// TODO(ivanpi): Also delete corresponding tags.
- this.tb_.row(todoId).delete(ctx, this.maybeEmit_(cb, todoId));
+ this.tb_.row(todoId).delete(ctx, cb);
});
define('editTodoText', function(ctx, todoId, text, cb) {
@@ -263,14 +280,14 @@
});
define('addTag', function(ctx, todoId, tag, cb) {
- this.addTagImpl_(ctx, this.tb_, todoId, tag, this.maybeEmit_(cb, todoId));
+ this.addTagImpl_(ctx, this.tb_, todoId, tag, cb);
});
define('removeTag', function(ctx, todoId, tag, cb) {
// NOTE: Table.delete is awkward (it takes a range), so instead we use
// Row.delete. It would be nice for Table.delete to operate on a single row
// and have a separate Table.deleteRowRange.
- this.tb_.row(tagKey(todoId, tag)).delete(ctx, this.maybeEmit_(cb, todoId));
+ this.tb_.row(tagKey(todoId, tag)).delete(ctx, cb);
});
////////////////////////////////////////
@@ -378,104 +395,17 @@
};
////////////////////////////////////////
-// Polling-based watch
-
-// Random number, used to implement watch. Each client writes to their own watch
-// key to signify that they've written new data, and each client periodically
-// polls all watch keys to see if anything has changed.
-var clientId = util.uid();
-function watchPrefix(listId) {
- return join(listId, 'watch');
-}
-function watchKey(listId) {
- return join(watchPrefix(listId), clientId);
-}
-
-var seq = 0; // for our own writes
-var prevVersions = null; // map of list id to version string
-
-// Increments stored seq for this client.
-define('bumpSeq_', function(ctx, listId, cb) {
- seq++;
- this.tb_.put(ctx, watchKey(listId), seq, cb);
-});
-
-// Returns a watch seq map for the given list id.
-define('getWatchSeqMap_', function(ctx, listId, cb) {
- this.getRows_(ctx, watchPrefix(listId), function(err, rows) {
- if (err) return cb(err);
- var seqMap = {}; // map of client id to seq
- _.forEach(rows, function(row) {
- var parts = row.key.split(SEP);
- console.assert(parts.length === 3); // <listId>/watch/<clientId>
- seqMap[parts[2]] = row.value;
- });
- cb(null, seqMap);
- });
-});
-
-// Returns true if any data has arrived via sync, false if not.
-define('checkForChanges_', function(ctx, cb) {
- var that = this;
- this.getListsOnly_(ctx, function(err, lists) {
- if (err) return cb(err);
- // Build a map of list id to current version.
- var currVersions = {};
- var listIds = _.pluck(lists, '_id');
- async.each(listIds, function(listId, cb) {
- that.getWatchSeqMap_(ctx, listId, function(err, seqMap) {
- if (err) return cb(err);
- // Remove self from seqMap, join the rest into a version string.
- delete seqMap[clientId];
- var strs = _.map(seqMap, function(v, k) {
- return k + ':' + v;
- });
- currVersions[listId] = strs.sort().join(',');
- cb();
- });
- }, function(err) {
- if (err) return cb(err);
- // Note that _.isEqual performs a deep comparison.
- var changed = prevVersions && !_.isEqual(currVersions, prevVersions);
- prevVersions = currVersions;
- cb(null, changed);
- });
- });
-});
-
-// Runs checkForChanges_ periodically and emits a 'change' event whenever a
-// change (from remote peer) is detected.
-SyncbaseDispatcher.prototype.watchLoop_ = function() {
- var that = this;
- var ctx = wt(this.ctx_).withValue(SILENT, 1);
- this.checkForChanges_(ctx, function(err, changed) {
- if (err) {
- console.log('checkForChanges_ failed: ' + err);
- } else if (changed) {
- console.log('checkForChanges_ detected a change');
- that.emit('change', true);
- }
- window.setTimeout(that.watchLoop_.bind(that), 500);
- });
-};
-
-////////////////////////////////////////
// Internal helpers
-// TODO(sadovsky): Drop this method once we have client watch.
-SyncbaseDispatcher.prototype.maybeEmit_ = function(cb, key) {
+// TODO(aghassemi): Remove this once changes to SyncGroups are included
+// in the watch stream.
+SyncbaseDispatcher.prototype.maybeEmit_ = function(cb) {
var that = this;
cb = cb || noop;
return function(err) {
cb.apply(null, arguments);
if (err) return;
that.emit('change');
- if (key) {
- // Run bumpSeq_ in the background.
- that.bumpSeq_(keyToListId(key), function(err) {
- if (err) console.log('bumpSeq_ failed: ' + err);
- });
- }
};
};
@@ -545,5 +475,5 @@
var newValue = marshal(updateFn(unmarshal(value)));
tb.put(wn(ctx, 'put:' + key), key, newValue, cb);
});
- }, this.maybeEmit_(cb, key));
+ }, cb);
});