Adding watch support

Change-Id: If8939c631e5a92a23391d9ce5dfab8e97d302a0f
diff --git a/mocks/syncbase-wrapper.js b/mocks/syncbase-wrapper.js
index d3e314c..d65dace 100644
--- a/mocks/syncbase-wrapper.js
+++ b/mocks/syncbase-wrapper.js
@@ -10,51 +10,68 @@
 var defineClass = require('../src/util/define-class');
 
 //All periods are expressed in milliseconds.
-var SYNC_LOOP_PERIOD = 50;
-var WATCH_LOOP_PERIOD = 50;
+var SYNC_LOOP_PERIOD = 25;
+var WATCH_LOOP_PERIOD = 25;
 
 var syncgroups = {};
 
-function update(a, b) {
+function updateWatchers(watchers, k, v) {
+  if (watchers) {
+    watchers.forEach(function(watcher) {
+      watcher.update(k, v);
+    });
+  }
+}
+
+function concatWatchers(a, b) {
+  if (!a) {
+    return b;
+  } else if (!b) {
+    return a;
+  }
+  return a.concat(b);
+}
+
+function update(a, b, key, parentWatchers) {
+  var watchers = concatWatchers(parentWatchers, b.watchers);
   $.each(a, function(k, v) {
-    if (k !== 'value' && k !== 'version') {
+    if (k !== 'value' && k !== 'version' && k !== 'watchers') {
       var bv = b[k];
-      if (bv) {
-        update(v, bv);
-      } else {
-        b[k] = $.extend(true, {}, v);
+      if (!bv) {
+        bv = b[k] = {};
       }
+      update(v, bv, key.concat(k), watchers);
     }
   });
 
-  if (a.version > b.version) {
+  if (a.version > b.version ||
+      a.version !== undefined && b.version === undefined ||
+      a.version === b.version && a.value !== b.value /* initial diff */) {
     b.value = a.value;
     b.version = a.version;
+    updateWatchers(watchers, key, b.value);
   }
 }
 
 function sync(a, b, prefixes) {
   $.each(prefixes, function() {
-    var suba = recursiveGet(a, this);
-    var subb = recursiveGet(b, this);
+    var suba = recursiveCreate(a, this);
+    var subb = recursiveCreate(b, this);
 
-    if (suba && subb) {
-      update(suba, subb);
-      update(subb, suba);
-    } else if (!suba) {
-      recursiveCopy(a, this, subb);
-    } else if (!subb) {
-      recursiveCopy(b, this, suba);
-    }
+    update(suba.node, subb.node, this, subb.parentWatchers);
+    update(subb.node, suba.node, this, suba.parentWatchers);
   });
+
+  a.endBatch();
+  b.endBatch();
 }
 
 function syncLoop() {
-  $.each(syncgroups, function() {
+  $.each(syncgroups, function(i, sg) {
     var prev;
-    this.forEach(function(sb) {
+    sg.forEach(function(sb) {
       if (prev) {
-        sync(prev, sb, this.prefixes);
+        sync(prev, sb, sg.prefixes);
       }
 
       prev = sb;
@@ -74,7 +91,9 @@
 }
 
 function recursiveCreate(node, key) { //it's recursive in spirit
+  var parentWatchers;
   $.each(key, function() {
+    parentWatchers = concatWatchers(parentWatchers, node.watchers);
     var child = node[this];
     if (!child) {
       child = node[this] = {};
@@ -82,59 +101,78 @@
     node = child;
   });
 
-  return node;
+  return {
+    node: node,
+    parentWatchers: parentWatchers
+  };
 }
 
 function recursiveSet(node, key, value) {
-  node = recursiveCreate(node, key);
+  var target = recursiveCreate(node, key);
 
-  node.value = value;
-  advanceVersion(node);
+  target.node.value = value;
+  advanceVersion(target.node);
+  updateWatchers(
+    concatWatchers(target.parentWatchers, node.watchers), key, value);
 }
 
-function recursiveCopy(node, key, content) {
-  $.extend(true, recursiveCreate(node, key), content);
-}
-
-function recursiveGet(node, key) {
+function recursiveGet(node, key, parentWatchers) {
   $.each(key, function() {
     if (!node) {
       return false;
     }
+    parentWatchers = concatWatchers(parentWatchers, node.watchers);
     node = node[this];
   });
-  return node;
+  return {
+    node: node,
+    parentWatchers: parentWatchers
+  };
 }
 
-function recursiveDelete(node, key) {
+function recursiveDelete(node, key, parentWatchers) {
+  parentWatchers = parentWatchers || [];
   if (key) {
-    node = recursiveGet(node, key);
+    var target = recursiveGet(node, key, parentWatchers);
+    node = target.node;
+    parentWatchers = target.parentWatchers;
   }
 
   if (node) {
-    delete node.value;
-    advanceVersion(node);
+    var watchers = concatWatchers(parentWatchers, node.watchers);
+
+    if (node.value !== undefined) {
+      delete node.value;
+      advanceVersion(node);
+      updateWatchers(watchers, key);
+    } else {
+      advanceVersion(node);
+    }
     $.each(node, function(key, value) {
-      if (key !== 'version') {
-        recursiveDelete(value);
+      if (key !== 'version' && key !== 'watchers') {
+        recursiveDelete(value, null, watchers);
       }
     });
   }
 }
 
-function extractData(repo) {
+function extractData(repo, onData, fullKey) {
   var data;
+  fullKey = fullKey || [];
   $.each(repo, function(k, v) {
     if (k === 'value') {
-      if (typeof data === 'object') {
-        if (v !== undefined) {
+      if (v !== undefined) {
+        if (typeof data === 'object') {
           data._ = v;
+        } else {
+          data = v;
         }
-      } else {
-        data = v;
+        if (onData) {
+          onData(fullKey, v);
+        }
       }
-    } else if (k !== 'version') {
-      var value = extractData(v);
+    } else if (k !== 'version' && k !== 'watchers') {
+      var value = extractData(v, onData, fullKey.concat(k));
       if (value !== undefined) {
         if (data === undefined) {
           data = {};
@@ -173,17 +211,21 @@
 
     put: function(k, v) {
       recursiveSet(this.repo, k, v);
+      this.repo.endBatch();
       return Promise.resolve();
     },
 
     delete: function(k) {
       recursiveDelete(this.repo, k);
+      this.repo.endBatch();
       return Promise.resolve();
     },
 
+    // TODO(rosswang): transitional
     getData: function() {
       return extractData(this.repo) || {};
     },
+    // TODO(rosswang): end transitional
 
     syncgroup: function(sgAdmin, name) {
       var repo = this.repo;
@@ -243,26 +285,111 @@
       return sgp;
     },
 
+    getRawWatched: function(prefix, pullHandler, streamHandler) {
+      var target = recursiveCreate(this.repo, prefix);
+      extractData(target.node, pullHandler.onData, prefix);
+      this.registerHandlers(target.node, streamHandler);
+      return Promise.resolve();
+    },
+
+    // TODO(rosswang): transitional
     refresh: function() {
       this.onUpdate(this.getData());
     }
+    // TODO(rosswang): end transitional
   },
 
+  privates: {
+    watcher: defineClass.innerClass({
+      publics: {
+        update: function(k, v) {
+          this.dispatchLast(true);
+          this.lastOp = {
+            key: k,
+            value: v
+          };
+          this.outer.opBatch.add(this.ifc);
+        },
+
+        endBatch: function() {
+          if (this.dispatchLast(false) && this.streamHandler.onBatchEnd) {
+            try {
+              this.streamHandler.onBatchEnd();
+            } catch (err) {
+              this.streamHandler.onError(err);
+            }
+          }
+        }
+      },
+
+      privates: {
+        dispatchLast: function(continued) {
+          if (this.lastOp) {
+            try {
+              if (this.lastOp.value === undefined) {
+                if (this.streamHandler.onDelete) {
+                  this.streamHandler.onDelete(this.lastOp.key, continued);
+                }
+              } else {
+                if (this.streamHandler.onPut) {
+                  this.streamHandler.onPut(
+                    this.lastOp.key, this.lastOp.value, continued);
+                }
+              }
+            } catch (err) {
+              this.streamHandler.onError(err);
+            }
+            return true;
+          } else {
+            return false;
+          }
+        }
+      },
+
+      init: function(streamHandler) {
+        this.streamHandler = streamHandler;
+      }
+    }),
+
+    registerHandlers: function(node, streamHandler) {
+      var watcher = this.watcher(streamHandler);
+
+      if (node.watchers) {
+        node.watchers.push(watcher);
+      } else {
+        node.watchers = [watcher];
+      }
+    }
+  },
+
+  // TODO(rosswang): transitional
   events: {
     onError: 'memory',
     onUpdate: 'memory'
   },
+  // TODO(rosswang): end transitional
 
   init: function() {
     var self = this;
 
-    this.repo = {};
+    var opBatch = this.opBatch = new Set();
 
+    this.repo = {
+      endBatch: function() {
+        opBatch.forEach(function(watcher) {
+          watcher.endBatch();
+        });
+        opBatch.clear();
+      }
+    };
+
+    // TODO(rosswang): transitional
     function watchLoop() {
       self.refresh();
       setTimeout(watchLoop, WATCH_LOOP_PERIOD);
     }
     process.nextTick(watchLoop);
+    // TODO(rosswang): end transitional
   }
 });
 
diff --git a/src/sync-util/deferred-sb-wrapper.js b/src/sync-util/deferred-sb-wrapper.js
index 8c268bc..616f3c3 100644
--- a/src/sync-util/deferred-sb-wrapper.js
+++ b/src/sync-util/deferred-sb-wrapper.js
@@ -2,10 +2,26 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+require('es6-shim');
+
+var $ = require('../util/jquery');
 var defineClass = require('../util/define-class');
+var SyncbaseWrapper = require('../vanadium-wrapper/syncbase-wrapper');
+
+var defs = {};
+
+$.each(SyncbaseWrapper.ifc, function(method) {
+  defs[method] = function() {
+    var args = arguments;
+    return this.sbPromise.then(function(sb) {
+      return sb[method].apply(sb, args);
+    });
+  };
+});
 
 var DeferredSbWrapper = defineClass({
-  publics: {
+  // TODO(rosswang): extend = transitional
+  publics: $.extend(defs, {
     batch: function(fn) {
       this.manageWrite(this.sbPromise.then(function(syncbase) {
         return syncbase.batch(fn);
@@ -35,8 +51,9 @@
         return syncbase.pull(prefix);
       });
     }
-  },
+  }),
 
+  // TODO(rosswang): transitional
   privates: {
     manageWrite: function(promise) {
       var writes = this.writes;
@@ -63,18 +80,22 @@
     onError: 'memory',
     onUpdate: ''
   },
+  // TODO(rosswang): end transitional
 
   init: function(sbPromise) {
+    this.sbPromise = sbPromise;
+
+    // TODO(rosswang): transitional
     var self = this;
 
     this.writes = new Set();
-    this.sbPromise = sbPromise;
 
     sbPromise.then(function(syncbase) {
       syncbase.onError.add(self.onError);
       syncbase.onUpdate.add(self.processUpdates);
     }).catch(this.onError);
+    // TODO(rosswang): end transitional
   }
 });
 
-module.exports = DeferredSbWrapper;
\ No newline at end of file
+module.exports = DeferredSbWrapper;
diff --git a/src/util/define-class.js b/src/util/define-class.js
index 0873408..5d1bf7e 100644
--- a/src/util/define-class.js
+++ b/src/util/define-class.js
@@ -64,7 +64,7 @@
       if ($.isArray(def.events)) {
         $.each(def.events, function(i, event) {
           if ($.type(event) === 'string') {
-            defineEvent(pthis, ifc, event);
+            pthis[event] = defineEvent(ifc, event);
           } else {
             defineEventsFromObject(pthis, ifc, event);
           }
@@ -97,6 +97,10 @@
     $.extend(constructor, def.statics);
   }
 
+  // The function bodies aren't actually useful but the function objects provide
+  // useful reflective properties.
+  constructor.ifc = def.publics;
+
   return constructor;
 }
 
@@ -166,35 +170,37 @@
   return proxy;
 }
 
-function defineEvent(pthis, ifc, name, flags) {
+defineClass.event = defineEvent;
+
+function defineEvent(ifc, name, flags) {
   var dispatcher = $.Callbacks(flags);
   //Use polyBind on function that fires to add the callable syntactic sugar
-  var callableDispatcher = pthis[name] = polyBind(function() {
+  var callableDispatcher = polyBind(function() {
     dispatcher.fireWith.call(dispatcher, ifc, arguments);
   }, dispatcher, dispatcher, false);
 
-  if (flags && flags.indexOf('private') > -1) {
-    return;
+  if (!(flags && flags.indexOf('private') > -1)) {
+    if (flags && flags.indexOf('public') > -1) {
+      ifc[name] = callableDispatcher;
+    } else {
+      var publicEvent = {};
+      /* We'll want the context to actually be callableDispatcher even though
+       * the interface and functionality of dispatcher suffice so that we can
+       * late-bind to the instance exposed to private this. */
+      polyBind(publicEvent, callableDispatcher,
+        ['disabled', 'fired', 'has', 'locked'], true);
+      polyReflexiveLateBind(publicEvent, callableDispatcher,
+        ['add', 'disable', 'empty', 'lock', 'remove']);
+
+      ifc[name] = publicEvent;
+    }
   }
 
-  if (flags && flags.indexOf('public') > -1) {
-    ifc[name] = callableDispatcher;
-  } else {
-    var publicEvent = {};
-    /* We'll want the context to actually be callableDispatcher even though
-     * the interface and functionality of dispatcher suffice so that we can
-     * late-bind to the instance exposed to private this. */
-    polyBind(publicEvent, callableDispatcher,
-      ['disabled', 'fired', 'has', 'locked'], true);
-    polyReflexiveLateBind(publicEvent, callableDispatcher,
-      ['add', 'disable', 'empty', 'lock', 'remove']);
-
-    ifc[name] = publicEvent;
-  }
+  return callableDispatcher;
 }
 
 function defineEventsFromObject(pthis, ifc, events) {
   $.each(events, function(event, flags) {
-    defineEvent(pthis, ifc, event, flags);
+    pthis[event] = defineEvent(ifc, event, flags);
   });
 }
diff --git a/src/vanadium-wrapper/syncbase-wrapper.js b/src/vanadium-wrapper/syncbase-wrapper.js
index 882dd2d..59b4555 100644
--- a/src/vanadium-wrapper/syncbase-wrapper.js
+++ b/src/vanadium-wrapper/syncbase-wrapper.js
@@ -44,6 +44,10 @@
   return key.join('.');
 }
 
+function splitKey(key) {
+  return key.split('.');
+}
+
 /**
  * Translate Syncbase hierarchical keys to object structure for easier
  * processing. '.' is chosen as the separator; '/' is reserved in Syncbase.
@@ -52,11 +56,9 @@
  * need regex escaping.
  */
 function recursiveSet(root, key, value) {
-  var matches = /\.?([^\.]*)(.*)/.exec(key);
-  var member = matches[1];
-  var remaining = matches[2];
+  var member = key[0];
 
-  if (remaining) {
+  if (key.length > 1) {
     var child = root[member];
     if (!child) {
       child = root[member] = {};
@@ -64,7 +66,7 @@
       child = root[member] = { _: child };
     }
 
-    recursiveSet(child, remaining, value);
+    recursiveSet(child, key.slice(1), value);
   } else {
     var obj = root[member];
     if (obj) {
@@ -75,6 +77,22 @@
   }
 }
 
+function recursiveDelete(root, key) {
+  var member = key[0];
+
+  if (key.length > 1) {
+    var child = root[member];
+    if (typeof child === 'object') {
+      recursiveDelete(child, key.slice(1));
+      if ($.isEmptyObject(child)) {
+        delete root[member];
+      }
+    }
+  } else {
+    delete root[member];
+  }
+}
+
 var SG_MEMBER_INFO = new syncbase.nosql.SyncgroupMemberInfo();
 
 // TODO(rosswang): generalize this
@@ -168,6 +186,8 @@
       return this.manageWrite(this.standardDelete(this.deleteFromSyncbase, k));
     },
 
+    // TODO(rosswang): transitional
+
     getData: function() {
       return this.data;
     },
@@ -260,7 +280,7 @@
                * Leave this handler attached but no-oping to drain the stream.
                */
             } else {
-              recursiveSet(newData, row[0], row[1]);
+              recursiveSet(newData, splitKey(row[0]), row[1]);
             }
           }).on('error', reject);
         }).catch(function(err) {
@@ -273,6 +293,8 @@
       }
     },
 
+    // TODO(rosswang): end transitional
+
     syncgroup: function(sgAdmin, name) {
       var self = this;
 
@@ -384,10 +406,197 @@
       };
 
       return sgp;
-    }
+    },
+
+    /**
+     * @return {
+     *    data,
+     *    onChange*(key, ?value, continued),
+     *    onUpdate*(data),
+     *    onError*(err),
+     *    onClose*(?err)
+     *  }
+     */
+    getWatchedObject: function(prefix) {
+      var result = {
+        data: {}
+      };
+
+      var onChange = defineClass.event(result, 'onChange');
+      var onUpdate = defineClass.event(result, 'onUpdate');
+      var onError = defineClass.event(result, 'onError');
+      var onClose = defineClass.event(result, 'onClose', 'memory');
+
+      function put(k, v) {
+        recursiveSet(result.data, k, v);
+      }
+
+      return this.getRawWatched(prefix, {
+        onData: put
+      }, {
+        onPut: function(k, v, continued) {
+          put(k, v);
+          onChange(k, v, continued);
+        },
+        onDelete: function(k, continued) {
+          recursiveDelete(result.data, k);
+          onChange(k, null, continued);
+        },
+        onBatchEnd: function() {
+          onUpdate(result.data);
+        },
+        onError: onError,
+        onClose: onClose
+      }).then(function() {
+        return result;
+      });
+    },
+
+    /**
+     * Pulls data from Syncbase and registers watch handlers. Returns a promise
+     * resolving after the initial pull.
+     *
+     * @param pullHandler { onData(key, value), onError(err) }
+     * @param streamHandler {
+     *    ?onPut(key, value, continued),
+     *    ?onDelete(key, continued),
+     *    ?onBatchEnd(),
+     *    onError(err),
+     *    onClose(?err)
+     *  }; These are callbacks rather than events to guarantee that no updates
+     *  are missed. `continued` indicates whether a change is followed by more
+     *  changes in same batch.
+     * @return a promise resolving after the initial pull completes. Watch
+     *  callbacks may continue to be called until onClose.
+     */
+    getRawWatched: function(prefix, pullHandler, streamHandler) {
+      var self = this;
+
+      var resumeMarker;
+
+      var opts = new syncbase.nosql.BatchOptions();
+      return self.runInBatch(self.context, self.db, opts, function(db, cb) {
+          Promise.all([
+            self.pull2(db, prefix, pullHandler.onData, pullHandler.onError),
+            promisify(db.getResumeMarker.bind(db))(self.context)
+          ]).then(function(args) {
+            resumeMarker = args[1];
+            cb('abort');
+          }, cb);
+        }).catch(function(err) {
+          if (err !== 'abort') {
+            throw err;
+          }
+
+          var stream = self.db.watch(self.context, 't', joinKey(prefix),
+            resumeMarker, streamHandler.onClose);
+          stream.on('data', function(change) {
+            try {
+              switch(change.changeType) {
+              case 'put':
+                new Promise(function(resolve, reject) {
+                  if (streamHandler.onPut) {
+                    change.getValue(function(err, value) {
+                      if (err) {
+                        reject(err);
+                      } else {
+                        resolve(streamHandler.onPut(splitKey(change.rowName),
+                          value, change.continued));
+                      }
+                    });
+                  } else {
+                    resolve();
+                  }
+                }).then(function() {
+                  if (!change.continued && streamHandler.onBatchEnd) {
+                    return streamHandler.onBatchEnd();
+                  }
+                }).catch(streamHandler.onError);
+                break;
+              case 'delete':
+                new Promise(function(resolve, reject) {
+                  if (streamHandler.onDelete) {
+                    resolve(streamHandler.onDelete(splitKey(change.rowName),
+                      change.continued));
+                  } else {
+                    resolve();
+                  }
+                }).then(function() {
+                  if (!change.continued && streamHandler.onBatchEnd) {
+                    return streamHandler.onBatchEnd();
+                  }
+                }).catch(streamHandler.onError);
+                break;
+              default:
+                streamHandler.onError(
+                  new Error('Invalid change type ' + change.changeType));
+              }
+            } catch(err) {
+              streamHandler.onError(err);
+            }
+          }).on('error', streamHandler.onError);
+        });
+    },
   },
 
   privates: {
+    /**
+     * TODO(rosswang): transitional: pull2 => pull
+     *
+     * Handlers may continue to be called even after the promise has been
+     * rejected.
+     *
+     * @param onData handler callback that takes a key, value pair.
+     * @param onError handler callback that takes an error.
+     * @return a promise that resolves when the pull is complete or rejects if
+     *  the pull failed or either handler threw.
+     */
+    pull2: function(db, prefix, onData, onError) {
+      var self = this;
+
+      return new Promise(function(resolve, reject) {
+        var isHeader = true;
+
+        var query = 'select k, v from t';
+        if (prefix) {
+          query += ' where k like "' + joinKey(prefix) + '%"';
+        }
+
+        db.exec(self.context, query, function(err) {
+          if (err) {
+            reject(err);
+          } else {
+            resolve();
+          }
+        }).on('data', function(row) {
+          if (isHeader) {
+            isHeader = false;
+            return;
+          }
+
+          try {
+            onData(splitKey(row[0]), row[1]);
+          } catch (err) {
+            reject(err);
+          }
+        }).on('error', function(err) {
+          if (!onError) {
+            reject(err);
+          } else {
+            try {
+              onError(err);
+            } catch (err2) {
+              reject(err2);
+            }
+          }
+        });
+      });
+    },
+
+    /* TODO(rosswang): Keep this around even though the dirty flag and write
+     * records are not used since watch integration, because there is still a
+     * potential race condition; I'm just not particularly sure how to deal with
+     * it yet. If it turns out we don't have to worry about it, delete this. */
     manageWrite: function(promise) {
       var writes = this.writes;
 
@@ -417,10 +626,12 @@
 
   constants: [ 'mountName' ],
 
+  // TODO(rosswang): transitional
   events: {
     onError: 'memory',
     onUpdate: '',
   },
+  // TODO(rosswang): end transitional
 
   init: function(context, db, mountName) {
     // TODO(rosswang): mountName probably won't be necessary after syncgroup
@@ -437,8 +648,7 @@
     this.putToSyncbase = promisify(this.t.put.bind(this.t));
     this.deleteFromSyncbase = promisify(this.t.deleteRange.bind(this.t));
 
-    // Start the watch loop to periodically poll for changes from sync.
-    // TODO(rosswang): Remove this once we have client watch.
+    // TODO(rosswang): transitional
     function watchLoop() {
       if (!self.pull.current) {
         self.refresh().catch(self.onError);
@@ -446,6 +656,7 @@
       setTimeout(watchLoop, 500);
     }
     process.nextTick(watchLoop);
+    // TODO(rosswang): end transitional
   }
 });
 
diff --git a/test/travel.js b/test/travel.js
index b4ae816..19ec9e6 100644
--- a/test/travel.js
+++ b/test/travel.js
@@ -30,7 +30,7 @@
  */
 var STABLE_SLA = 2500;
 var SYNC_SLA = MockSyncbaseWrapper.SYNC_SLA;
-var DEVICE_DISCOVERY_SLA = 5000;
+var DEVICE_DISCOVERY_SLA = 3000;
 
 function cleanDom() {
   $('body').empty();
diff --git a/test/vanadium-wrapper.js b/test/vanadium-wrapper.js
index da67958..dbe22ff 100644
--- a/test/vanadium-wrapper.js
+++ b/test/vanadium-wrapper.js
@@ -32,7 +32,7 @@
       return context;
     },
     function(err) {
-      t.fail('init error: ' + err);
+      t.fail('init error: ' + err.stack);
     });
 
   mockVanadium.finishInit(null, mockRuntime);