Adding watch support
Change-Id: If8939c631e5a92a23391d9ce5dfab8e97d302a0f
diff --git a/mocks/syncbase-wrapper.js b/mocks/syncbase-wrapper.js
index d3e314c..d65dace 100644
--- a/mocks/syncbase-wrapper.js
+++ b/mocks/syncbase-wrapper.js
@@ -10,51 +10,68 @@
var defineClass = require('../src/util/define-class');
//All periods are expressed in milliseconds.
-var SYNC_LOOP_PERIOD = 50;
-var WATCH_LOOP_PERIOD = 50;
+var SYNC_LOOP_PERIOD = 25;
+var WATCH_LOOP_PERIOD = 25;
var syncgroups = {};
-function update(a, b) {
+function updateWatchers(watchers, k, v) {
+ if (watchers) {
+ watchers.forEach(function(watcher) {
+ watcher.update(k, v);
+ });
+ }
+}
+
+function concatWatchers(a, b) {
+ if (!a) {
+ return b;
+ } else if (!b) {
+ return a;
+ }
+ return a.concat(b);
+}
+
+function update(a, b, key, parentWatchers) {
+ var watchers = concatWatchers(parentWatchers, b.watchers);
$.each(a, function(k, v) {
- if (k !== 'value' && k !== 'version') {
+ if (k !== 'value' && k !== 'version' && k !== 'watchers') {
var bv = b[k];
- if (bv) {
- update(v, bv);
- } else {
- b[k] = $.extend(true, {}, v);
+ if (!bv) {
+ bv = b[k] = {};
}
+ update(v, bv, key.concat(k), watchers);
}
});
- if (a.version > b.version) {
+ if (a.version > b.version ||
+ a.version !== undefined && b.version === undefined ||
+ a.version === b.version && a.value !== b.value /* initial diff */) {
b.value = a.value;
b.version = a.version;
+ updateWatchers(watchers, key, b.value);
}
}
function sync(a, b, prefixes) {
$.each(prefixes, function() {
- var suba = recursiveGet(a, this);
- var subb = recursiveGet(b, this);
+ var suba = recursiveCreate(a, this);
+ var subb = recursiveCreate(b, this);
- if (suba && subb) {
- update(suba, subb);
- update(subb, suba);
- } else if (!suba) {
- recursiveCopy(a, this, subb);
- } else if (!subb) {
- recursiveCopy(b, this, suba);
- }
+ update(suba.node, subb.node, this, subb.parentWatchers);
+ update(subb.node, suba.node, this, suba.parentWatchers);
});
+
+ a.endBatch();
+ b.endBatch();
}
function syncLoop() {
- $.each(syncgroups, function() {
+ $.each(syncgroups, function(i, sg) {
var prev;
- this.forEach(function(sb) {
+ sg.forEach(function(sb) {
if (prev) {
- sync(prev, sb, this.prefixes);
+ sync(prev, sb, sg.prefixes);
}
prev = sb;
@@ -74,7 +91,9 @@
}
function recursiveCreate(node, key) { //it's recursive in spirit
+ var parentWatchers;
$.each(key, function() {
+ parentWatchers = concatWatchers(parentWatchers, node.watchers);
var child = node[this];
if (!child) {
child = node[this] = {};
@@ -82,59 +101,78 @@
node = child;
});
- return node;
+ return {
+ node: node,
+ parentWatchers: parentWatchers
+ };
}
function recursiveSet(node, key, value) {
- node = recursiveCreate(node, key);
+ var target = recursiveCreate(node, key);
- node.value = value;
- advanceVersion(node);
+ target.node.value = value;
+ advanceVersion(target.node);
+ updateWatchers(
+ concatWatchers(target.parentWatchers, node.watchers), key, value);
}
-function recursiveCopy(node, key, content) {
- $.extend(true, recursiveCreate(node, key), content);
-}
-
-function recursiveGet(node, key) {
+function recursiveGet(node, key, parentWatchers) {
$.each(key, function() {
if (!node) {
return false;
}
+ parentWatchers = concatWatchers(parentWatchers, node.watchers);
node = node[this];
});
- return node;
+ return {
+ node: node,
+ parentWatchers: parentWatchers
+ };
}
-function recursiveDelete(node, key) {
+function recursiveDelete(node, key, parentWatchers) {
+ parentWatchers = parentWatchers || [];
if (key) {
- node = recursiveGet(node, key);
+ var target = recursiveGet(node, key, parentWatchers);
+ node = target.node;
+ parentWatchers = target.parentWatchers;
}
if (node) {
- delete node.value;
- advanceVersion(node);
+ var watchers = concatWatchers(parentWatchers, node.watchers);
+
+ if (node.value !== undefined) {
+ delete node.value;
+ advanceVersion(node);
+ updateWatchers(watchers, key);
+ } else {
+ advanceVersion(node);
+ }
$.each(node, function(key, value) {
- if (key !== 'version') {
- recursiveDelete(value);
+ if (key !== 'version' && key !== 'watchers') {
+ recursiveDelete(value, null, watchers);
}
});
}
}
-function extractData(repo) {
+function extractData(repo, onData, fullKey) {
var data;
+ fullKey = fullKey || [];
$.each(repo, function(k, v) {
if (k === 'value') {
- if (typeof data === 'object') {
- if (v !== undefined) {
+ if (v !== undefined) {
+ if (typeof data === 'object') {
data._ = v;
+ } else {
+ data = v;
}
- } else {
- data = v;
+ if (onData) {
+ onData(fullKey, v);
+ }
}
- } else if (k !== 'version') {
- var value = extractData(v);
+ } else if (k !== 'version' && k !== 'watchers') {
+ var value = extractData(v, onData, fullKey.concat(k));
if (value !== undefined) {
if (data === undefined) {
data = {};
@@ -173,17 +211,21 @@
put: function(k, v) {
recursiveSet(this.repo, k, v);
+ this.repo.endBatch();
return Promise.resolve();
},
delete: function(k) {
recursiveDelete(this.repo, k);
+ this.repo.endBatch();
return Promise.resolve();
},
+ // TODO(rosswang): transitional
getData: function() {
return extractData(this.repo) || {};
},
+ // TODO(rosswang): end transitional
syncgroup: function(sgAdmin, name) {
var repo = this.repo;
@@ -243,26 +285,111 @@
return sgp;
},
+ getRawWatched: function(prefix, pullHandler, streamHandler) {
+ var target = recursiveCreate(this.repo, prefix);
+ extractData(target.node, pullHandler.onData, prefix);
+ this.registerHandlers(target.node, streamHandler);
+ return Promise.resolve();
+ },
+
+ // TODO(rosswang): transitional
refresh: function() {
this.onUpdate(this.getData());
}
+ // TODO(rosswang): end transitional
},
+ privates: {
+ watcher: defineClass.innerClass({
+ publics: {
+ update: function(k, v) {
+ this.dispatchLast(true);
+ this.lastOp = {
+ key: k,
+ value: v
+ };
+ this.outer.opBatch.add(this.ifc);
+ },
+
+ endBatch: function() {
+ if (this.dispatchLast(false) && this.streamHandler.onBatchEnd) {
+ try {
+ this.streamHandler.onBatchEnd();
+ } catch (err) {
+ this.streamHandler.onError(err);
+ }
+ }
+ }
+ },
+
+ privates: {
+ dispatchLast: function(continued) {
+ if (this.lastOp) {
+ try {
+ if (this.lastOp.value === undefined) {
+ if (this.streamHandler.onDelete) {
+ this.streamHandler.onDelete(this.lastOp.key, continued);
+ }
+ } else {
+ if (this.streamHandler.onPut) {
+ this.streamHandler.onPut(
+ this.lastOp.key, this.lastOp.value, continued);
+ }
+ }
+ } catch (err) {
+ this.streamHandler.onError(err);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ },
+
+ init: function(streamHandler) {
+ this.streamHandler = streamHandler;
+ }
+ }),
+
+ registerHandlers: function(node, streamHandler) {
+ var watcher = this.watcher(streamHandler);
+
+ if (node.watchers) {
+ node.watchers.push(watcher);
+ } else {
+ node.watchers = [watcher];
+ }
+ }
+ },
+
+ // TODO(rosswang): transitional
events: {
onError: 'memory',
onUpdate: 'memory'
},
+ // TODO(rosswang): end transitional
init: function() {
var self = this;
- this.repo = {};
+ var opBatch = this.opBatch = new Set();
+ this.repo = {
+ endBatch: function() {
+ opBatch.forEach(function(watcher) {
+ watcher.endBatch();
+ });
+ opBatch.clear();
+ }
+ };
+
+ // TODO(rosswang): transitional
function watchLoop() {
self.refresh();
setTimeout(watchLoop, WATCH_LOOP_PERIOD);
}
process.nextTick(watchLoop);
+ // TODO(rosswang): end transitional
}
});
diff --git a/src/sync-util/deferred-sb-wrapper.js b/src/sync-util/deferred-sb-wrapper.js
index 8c268bc..616f3c3 100644
--- a/src/sync-util/deferred-sb-wrapper.js
+++ b/src/sync-util/deferred-sb-wrapper.js
@@ -2,10 +2,26 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+require('es6-shim');
+
+var $ = require('../util/jquery');
var defineClass = require('../util/define-class');
+var SyncbaseWrapper = require('../vanadium-wrapper/syncbase-wrapper');
+
+var defs = {};
+
+$.each(SyncbaseWrapper.ifc, function(method) {
+ defs[method] = function() {
+ var args = arguments;
+ return this.sbPromise.then(function(sb) {
+ return sb[method].apply(sb, args);
+ });
+ };
+});
var DeferredSbWrapper = defineClass({
- publics: {
+ // TODO(rosswang): extend = transitional
+ publics: $.extend(defs, {
batch: function(fn) {
this.manageWrite(this.sbPromise.then(function(syncbase) {
return syncbase.batch(fn);
@@ -35,8 +51,9 @@
return syncbase.pull(prefix);
});
}
- },
+ }),
+ // TODO(rosswang): transitional
privates: {
manageWrite: function(promise) {
var writes = this.writes;
@@ -63,18 +80,22 @@
onError: 'memory',
onUpdate: ''
},
+ // TODO(rosswang): end transitional
init: function(sbPromise) {
+ this.sbPromise = sbPromise;
+
+ // TODO(rosswang): transitional
var self = this;
this.writes = new Set();
- this.sbPromise = sbPromise;
sbPromise.then(function(syncbase) {
syncbase.onError.add(self.onError);
syncbase.onUpdate.add(self.processUpdates);
}).catch(this.onError);
+ // TODO(rosswang): end transitional
}
});
-module.exports = DeferredSbWrapper;
\ No newline at end of file
+module.exports = DeferredSbWrapper;
diff --git a/src/util/define-class.js b/src/util/define-class.js
index 0873408..5d1bf7e 100644
--- a/src/util/define-class.js
+++ b/src/util/define-class.js
@@ -64,7 +64,7 @@
if ($.isArray(def.events)) {
$.each(def.events, function(i, event) {
if ($.type(event) === 'string') {
- defineEvent(pthis, ifc, event);
+ pthis[event] = defineEvent(ifc, event);
} else {
defineEventsFromObject(pthis, ifc, event);
}
@@ -97,6 +97,10 @@
$.extend(constructor, def.statics);
}
+ // The function bodies aren't actually useful but the function objects provide
+ // useful reflective properties.
+ constructor.ifc = def.publics;
+
return constructor;
}
@@ -166,35 +170,37 @@
return proxy;
}
-function defineEvent(pthis, ifc, name, flags) {
+defineClass.event = defineEvent;
+
+function defineEvent(ifc, name, flags) {
var dispatcher = $.Callbacks(flags);
//Use polyBind on function that fires to add the callable syntactic sugar
- var callableDispatcher = pthis[name] = polyBind(function() {
+ var callableDispatcher = polyBind(function() {
dispatcher.fireWith.call(dispatcher, ifc, arguments);
}, dispatcher, dispatcher, false);
- if (flags && flags.indexOf('private') > -1) {
- return;
+ if (!(flags && flags.indexOf('private') > -1)) {
+ if (flags && flags.indexOf('public') > -1) {
+ ifc[name] = callableDispatcher;
+ } else {
+ var publicEvent = {};
+ /* We'll want the context to actually be callableDispatcher even though
+ * the interface and functionality of dispatcher suffice so that we can
+ * late-bind to the instance exposed to private this. */
+ polyBind(publicEvent, callableDispatcher,
+ ['disabled', 'fired', 'has', 'locked'], true);
+ polyReflexiveLateBind(publicEvent, callableDispatcher,
+ ['add', 'disable', 'empty', 'lock', 'remove']);
+
+ ifc[name] = publicEvent;
+ }
}
- if (flags && flags.indexOf('public') > -1) {
- ifc[name] = callableDispatcher;
- } else {
- var publicEvent = {};
- /* We'll want the context to actually be callableDispatcher even though
- * the interface and functionality of dispatcher suffice so that we can
- * late-bind to the instance exposed to private this. */
- polyBind(publicEvent, callableDispatcher,
- ['disabled', 'fired', 'has', 'locked'], true);
- polyReflexiveLateBind(publicEvent, callableDispatcher,
- ['add', 'disable', 'empty', 'lock', 'remove']);
-
- ifc[name] = publicEvent;
- }
+ return callableDispatcher;
}
function defineEventsFromObject(pthis, ifc, events) {
$.each(events, function(event, flags) {
- defineEvent(pthis, ifc, event, flags);
+ pthis[event] = defineEvent(ifc, event, flags);
});
}
diff --git a/src/vanadium-wrapper/syncbase-wrapper.js b/src/vanadium-wrapper/syncbase-wrapper.js
index 882dd2d..59b4555 100644
--- a/src/vanadium-wrapper/syncbase-wrapper.js
+++ b/src/vanadium-wrapper/syncbase-wrapper.js
@@ -44,6 +44,10 @@
return key.join('.');
}
+function splitKey(key) {
+ return key.split('.');
+}
+
/**
* Translate Syncbase hierarchical keys to object structure for easier
* processing. '.' is chosen as the separator; '/' is reserved in Syncbase.
@@ -52,11 +56,9 @@
* need regex escaping.
*/
function recursiveSet(root, key, value) {
- var matches = /\.?([^\.]*)(.*)/.exec(key);
- var member = matches[1];
- var remaining = matches[2];
+ var member = key[0];
- if (remaining) {
+ if (key.length > 1) {
var child = root[member];
if (!child) {
child = root[member] = {};
@@ -64,7 +66,7 @@
child = root[member] = { _: child };
}
- recursiveSet(child, remaining, value);
+ recursiveSet(child, key.slice(1), value);
} else {
var obj = root[member];
if (obj) {
@@ -75,6 +77,22 @@
}
}
+function recursiveDelete(root, key) {
+ var member = key[0];
+
+ if (key.length > 1) {
+ var child = root[member];
+ if (typeof child === 'object') {
+ recursiveDelete(child, key.slice(1));
+ if ($.isEmptyObject(child)) {
+ delete root[member];
+ }
+ }
+ } else {
+ delete root[member];
+ }
+}
+
var SG_MEMBER_INFO = new syncbase.nosql.SyncgroupMemberInfo();
// TODO(rosswang): generalize this
@@ -168,6 +186,8 @@
return this.manageWrite(this.standardDelete(this.deleteFromSyncbase, k));
},
+ // TODO(rosswang): transitional
+
getData: function() {
return this.data;
},
@@ -260,7 +280,7 @@
* Leave this handler attached but no-oping to drain the stream.
*/
} else {
- recursiveSet(newData, row[0], row[1]);
+ recursiveSet(newData, splitKey(row[0]), row[1]);
}
}).on('error', reject);
}).catch(function(err) {
@@ -273,6 +293,8 @@
}
},
+ // TODO(rosswang): end transitional
+
syncgroup: function(sgAdmin, name) {
var self = this;
@@ -384,10 +406,197 @@
};
return sgp;
- }
+ },
+
+ /**
+ * @return {
+ * data,
+ * onChange*(key, ?value, continued),
+ * onUpdate*(data),
+ * onError*(err),
+ * onClose*(?err)
+ * }
+ */
+ getWatchedObject: function(prefix) {
+ var result = {
+ data: {}
+ };
+
+ var onChange = defineClass.event(result, 'onChange');
+ var onUpdate = defineClass.event(result, 'onUpdate');
+ var onError = defineClass.event(result, 'onError');
+ var onClose = defineClass.event(result, 'onClose', 'memory');
+
+ function put(k, v) {
+ recursiveSet(result.data, k, v);
+ }
+
+ return this.getRawWatched(prefix, {
+ onData: put
+ }, {
+ onPut: function(k, v, continued) {
+ put(k, v);
+ onChange(k, v, continued);
+ },
+ onDelete: function(k, continued) {
+ recursiveDelete(result.data, k);
+ onChange(k, null, continued);
+ },
+ onBatchEnd: function() {
+ onUpdate(result.data);
+ },
+ onError: onError,
+ onClose: onClose
+ }).then(function() {
+ return result;
+ });
+ },
+
+ /**
+ * Pulls data from Syncbase and registers watch handlers. Returns a promise
+ * resolving after the initial pull.
+ *
+ * @param pullHandler { onData(key, value), onError(err) }
+ * @param streamHandler {
+ * ?onPut(key, value, continued),
+ * ?onDelete(key, continued),
+ * ?onBatchEnd(),
+ * onError(err),
+ * onClose(?err)
+ * }; These are callbacks rather than events to guarantee that no updates
+ * are missed. `continued` indicates whether a change is followed by more
+ * changes in same batch.
+ * @return a promise resolving after the initial pull completes. Watch
+ * callbacks may continue to be called until onClose.
+ */
+ getRawWatched: function(prefix, pullHandler, streamHandler) {
+ var self = this;
+
+ var resumeMarker;
+
+ var opts = new syncbase.nosql.BatchOptions();
+ return self.runInBatch(self.context, self.db, opts, function(db, cb) {
+ Promise.all([
+ self.pull2(db, prefix, pullHandler.onData, pullHandler.onError),
+ promisify(db.getResumeMarker.bind(db))(self.context)
+ ]).then(function(args) {
+ resumeMarker = args[1];
+ cb('abort');
+ }, cb);
+ }).catch(function(err) {
+ if (err !== 'abort') {
+ throw err;
+ }
+
+ var stream = self.db.watch(self.context, 't', joinKey(prefix),
+ resumeMarker, streamHandler.onClose);
+ stream.on('data', function(change) {
+ try {
+ switch(change.changeType) {
+ case 'put':
+ new Promise(function(resolve, reject) {
+ if (streamHandler.onPut) {
+ change.getValue(function(err, value) {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(streamHandler.onPut(splitKey(change.rowName),
+ value, change.continued));
+ }
+ });
+ } else {
+ resolve();
+ }
+ }).then(function() {
+ if (!change.continued && streamHandler.onBatchEnd) {
+ return streamHandler.onBatchEnd();
+ }
+ }).catch(streamHandler.onError);
+ break;
+ case 'delete':
+ new Promise(function(resolve, reject) {
+ if (streamHandler.onDelete) {
+ resolve(streamHandler.onDelete(splitKey(change.rowName),
+ change.continued));
+ } else {
+ resolve();
+ }
+ }).then(function() {
+ if (!change.continued && streamHandler.onBatchEnd) {
+ return streamHandler.onBatchEnd();
+ }
+ }).catch(streamHandler.onError);
+ break;
+ default:
+ streamHandler.onError(
+ new Error('Invalid change type ' + change.changeType));
+ }
+ } catch(err) {
+ streamHandler.onError(err);
+ }
+ }).on('error', streamHandler.onError);
+ });
+ },
},
privates: {
+ /**
+ * TODO(rosswang): transitional: pull2 => pull
+ *
+ * Handlers may continue to be called even after the promise has been
+ * rejected.
+ *
+ * @param onData handler callback that takes a key, value pair.
+ * @param onError handler callback that takes an error.
+ * @return a promise that resolves when the pull is complete or rejects if
+ * the pull failed or either handler threw.
+ */
+ pull2: function(db, prefix, onData, onError) {
+ var self = this;
+
+ return new Promise(function(resolve, reject) {
+ var isHeader = true;
+
+ var query = 'select k, v from t';
+ if (prefix) {
+ query += ' where k like "' + joinKey(prefix) + '%"';
+ }
+
+ db.exec(self.context, query, function(err) {
+ if (err) {
+ reject(err);
+ } else {
+ resolve();
+ }
+ }).on('data', function(row) {
+ if (isHeader) {
+ isHeader = false;
+ return;
+ }
+
+ try {
+ onData(splitKey(row[0]), row[1]);
+ } catch (err) {
+ reject(err);
+ }
+ }).on('error', function(err) {
+ if (!onError) {
+ reject(err);
+ } else {
+ try {
+ onError(err);
+ } catch (err2) {
+ reject(err2);
+ }
+ }
+ });
+ });
+ },
+
+ /* TODO(rosswang): Keep this around even though the dirty flag and write
+ * records are not used since watch integration, because there is still a
+ * potential race condition; I'm just not particularly sure how to deal with
+ * it yet. If it turns out we don't have to worry about it, delete this. */
manageWrite: function(promise) {
var writes = this.writes;
@@ -417,10 +626,12 @@
constants: [ 'mountName' ],
+ // TODO(rosswang): transitional
events: {
onError: 'memory',
onUpdate: '',
},
+ // TODO(rosswang): end transitional
init: function(context, db, mountName) {
// TODO(rosswang): mountName probably won't be necessary after syncgroup
@@ -437,8 +648,7 @@
this.putToSyncbase = promisify(this.t.put.bind(this.t));
this.deleteFromSyncbase = promisify(this.t.deleteRange.bind(this.t));
- // Start the watch loop to periodically poll for changes from sync.
- // TODO(rosswang): Remove this once we have client watch.
+ // TODO(rosswang): transitional
function watchLoop() {
if (!self.pull.current) {
self.refresh().catch(self.onError);
@@ -446,6 +656,7 @@
setTimeout(watchLoop, 500);
}
process.nextTick(watchLoop);
+ // TODO(rosswang): end transitional
}
});
diff --git a/test/travel.js b/test/travel.js
index b4ae816..19ec9e6 100644
--- a/test/travel.js
+++ b/test/travel.js
@@ -30,7 +30,7 @@
*/
var STABLE_SLA = 2500;
var SYNC_SLA = MockSyncbaseWrapper.SYNC_SLA;
-var DEVICE_DISCOVERY_SLA = 5000;
+var DEVICE_DISCOVERY_SLA = 3000;
function cleanDom() {
$('body').empty();
diff --git a/test/vanadium-wrapper.js b/test/vanadium-wrapper.js
index da67958..dbe22ff 100644
--- a/test/vanadium-wrapper.js
+++ b/test/vanadium-wrapper.js
@@ -32,7 +32,7 @@
return context;
},
function(err) {
- t.fail('init error: ' + err);
+ t.fail('init error: ' + err.stack);
});
mockVanadium.finishInit(null, mockRuntime);