reader: add support for sync groups.

Several changes to support adding sync groups have been made to
`browser/vanadium/syncbase`. Raw Syncbase API calls have been broken out into
separate, reusable modules for accomplishing simple tasks like getting a
database or a table. All `syncbased` instances are started with very permissive
ACLs to get a minimum working example of sync between peers (SEE: #19). Code has
been added to support watching for changes to originate from sync. Peer
discovery has been temporarily removed since all previous data sharing now goes
through Syncbase.

The browser/vanadium/glob-stream module was refactored due to a serious bug
where failing RPC requests were not being handled correctly. This caused a
recursive loop of uncaught exceptions.

Some additional, smaller changes in this CL.

* Old, non-syncbase storage and transfer code has been removed.
* Added a quick way to change an application id via query param.
* Added a generic error wrapper to add more context to Vanadium RPC errors.

Closes #16

Change-Id: I6d45790de7551386bfcbc6c5ec3b93ef47066fc2
diff --git a/Makefile b/Makefile
index 1979a27..6095f1c 100644
--- a/Makefile
+++ b/Makefile
@@ -13,6 +13,7 @@
 host ?= 127.0.0.1
 port ?= 8080
 syncbase_port ?= 4000
+cloudsync_port ?= 8000
 id ?= $(shell if test -e tmp/id; then cat tmp/id; else bin/cuid; fi)
 
 all: public/bundle.js node_modules
@@ -39,7 +40,7 @@
 		$< 1> $@
 
 .PHONY:
-distclean:
+distclean: clean
 	@$(RM) -fr node_modules
 	@v23 goext distclean
 
@@ -48,8 +49,9 @@
 	@$(RM) -fr npm-debug.log
 	@$(RM) -fr public/bundle.js
 	@$(RM) -fr tmp
-	@$(RM) -fr bin/syncbased
-	@$(RM) -fr bin/principal
+	@$(RM) -fr credentials
+	@$(RM) -f bin/principal
+	@$(RM) -f bin/syncbased
 
 .PHONY:
 lint: node_modules
@@ -100,4 +102,19 @@
 		--v23.proxy="/ns.dev.v.io:8101/proxy" \
 		--v23.tcp.address=":$(syncbase_port)" \
 		--v23.credentials="credentials" \
-		--v23.permissions.literal='{"Admin":{"In":["$(blessing)"]},"Write":{"In":["$(blessing)"]},"Read":{"In":["$(blessing)"]},"Resolve":{"In":["$(blessing)"]},"Debug":{"In":["$(blessing)"]}}'
+		--v23.permissions.literal='{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}'
+
+.PHONY:
+cloudsync: bin/syncbased credentials tmp
+	$(eval blessing := $(shell bin/principal dump --v23.credentials=./credentials -s=true))
+	$(eval email := $(subst dev.v.io/u/,,$(blessing)))
+	./bin/syncbased \
+		--v=5 \
+		--alsologtostderr=false \
+		--root-dir="tmp/cloudsync" \
+		--name="users/$(email)/reader/cloudsync" \
+		--v23.namespace.root="/ns.dev.v.io:8101" \
+		--v23.proxy="/ns.dev.v.io:8101/proxy" \
+		--v23.tcp.address=":$(cloudsync_port)" \
+		--v23.credentials="credentials" \
+		--v23.permissions.literal='{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}'
diff --git a/README.md b/README.md
index b090509..7b8b937 100644
--- a/README.md
+++ b/README.md
@@ -41,6 +41,28 @@
 
     make syncbase
 
-This will automatically have you set up credentials etc. If you want to remove stored data & credentials use:
+Run the syncbase instance which hosts the syncgroup:
+
+    make cloudsync
+
+This will run a syncbased instance that will mount as
+"users/<email>/reader/cloudsync" and host the syncgroup. In order for peers to
+sync an instance of this needs to be running somewhere.
+
+To run a new syncbase peer and corresponding application use variables to change
+the startup settings:
+
+    syncbase_port=8888 id=`./bin/cuid` make syncbase
+
+This will generate a new client id and start a new syncbased instance on a
+different port. The generated id can be grabbed from the standard out and will
+look something like this "cif7en1kb00007uigyohv58tx". Once you have the id you
+can open a new browser window and use the id in a query param to initialize the
+application to connect as that peer.
+
+    http://127.0.0.1:8080/?id=<id>
+
+This will automatically have you set up credentials etc. If you want to remove
+stored data & credentials use:
 
     make clean
diff --git a/browser/components/constellation/state.js b/browser/components/constellation/state.js
index f6f6373..aa6f13b 100644
--- a/browser/components/constellation/state.js
+++ b/browser/components/constellation/state.js
@@ -2,13 +2,10 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-var debug = require('debug')('reader:constellation:state');
 var hg = require('mercury');
 
 // Create state for the constellation component
 module.exports = function create(options) {
-  debug('initialize state');
-
   var state = hg.state({
     peers: hg.varhash({}, peer),
     error: hg.value(null),
diff --git a/browser/components/pdf/state.js b/browser/components/pdf/state.js
index 73dc260..117971c 100644
--- a/browser/components/pdf/state.js
+++ b/browser/components/pdf/state.js
@@ -9,8 +9,6 @@
 module.exports = create;
 
 function create(options) {
-  debug('creating PDF state: %o', options);
-
   var state = hg.state({
     error: hg.value(null),
     pdf: hg.value(null),
diff --git a/browser/hash-object.js b/browser/hash-object.js
deleted file mode 100644
index e1d0c72..0000000
--- a/browser/hash-object.js
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var sha256d = require('sha256d');
-
-module.exports = hash;
-
-function hash(data) {
-  var h = sha256d();
-  var string = JSON.stringify(data);
-
-  h.update(string);
-
-  return h.digest('hex');
-}
diff --git a/browser/main.js b/browser/main.js
index 95c401a..d135436 100644
--- a/browser/main.js
+++ b/browser/main.js
@@ -21,8 +21,13 @@
 var routes = require('./routes');
 var vanadium = require('./vanadium');
 var window = require('global/window');
+var qs = require('qs');
+var url = require('url');
 
+// Expose globals for debugging.
 window.debug = require('debug');
+window.require = require;
+global.require = require;
 
 domready(function ondomready() {
   debug('domready');
@@ -38,30 +43,53 @@
 
   // TODO(jasoncampbell): add an error component for aggregating, logging, and
   // displaying errors in the UI.
-  state.error(function(err) {
+  state.error(function onstateerror(err) {
     if (!err) {
       return;
     }
 
-    console.error('TODO: add an error component');
+    console.error('TODO(jasoncampbell): add an error component');
     console.error(err.stack);
+
+    for (var key in err) {
+      if (err.hasOwnProperty(key)) {
+        console.error('* %s => %o', key, err[key]);
+      }
+    }
   });
 
   router(state, routes).on('notfound', notfound);
 
+  // Quick way to change the id of the running application using a query param.
+  // TODO(jasoncampbell): Create a configuration screen/component.
+  var query = url.parse(window.location.href).query;
+  var id = qs.parse(query).id || process.env.ID;
+
+  debug('##### %s #####', id);
+
   // The vanadium client is coupled to the application state here so that async
   // code paths in the ./vanadium modules can be isolated to the application
   // initialization. This allows components to be separately tested/interacted
   // with as mappings between data and UI without being tangled into the
   // local vanadium discovery process.
-  var client = vanadium({
-    id: process.env.ID,
-    state: state.constellation
+  var client = vanadium({ id: id });
+
+  client.on('error', function onvanadiumerror(err) {
+    state.error.set(err);
   });
 
   client.on('syncbase', function onsyncbase(store) {
     state.store.set(store);
 
+    store.sync(function onsync(err) {
+      if (err) {
+        state.error.set(err);
+        return;
+      }
+
+      debug('store.sync succeeded!');
+    });
+
     // Setup watch.
     var ws = store.createWatchStream('files');
 
@@ -71,14 +99,23 @@
       }
     });
 
-    ws.on('data', function(change) {
+    ws.on('data', function onwatchchange(change) {
       debug('watch stream change: %o', change);
+      // NOTE: this triggers a recursion between clients :(
+
+      if (change.type === 'put') {
+        state.files.collection.put(change.key, change.value);
+      }
+
+      if (change.type === 'delete') {
+        state.files.collection.delete(change.key);
+      }
     });
 
     // Scan all keys and populate state.
     var stream = store.createReadStream('files');
 
-    stream.on('data', function(data) {
+    stream.on('data', function onreadstreamdata(data) {
       state.files.collection.put(data.key, data.value);
     });
 
@@ -113,8 +150,6 @@
     });
 
     each(collection, function(key, value) {
-      debug('each iterator: %o', value);
-
       store.put('files', value, function callback(err, file) {
         if (err) {
           state.error.set(err);
diff --git a/browser/vanadium/error.js b/browser/vanadium/error.js
new file mode 100644
index 0000000..50619fd
--- /dev/null
+++ b/browser/vanadium/error.js
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = error;
+
+function error(err, message, callback) {
+  var werr = new Error(message);
+  werr.cause = err;
+
+  if (callback) {
+    callback(werr);
+    return;
+  }
+
+  return werr;
+}
diff --git a/browser/vanadium/glob-stream.js b/browser/vanadium/glob-stream.js
index 68e220a..6719ab0 100644
--- a/browser/vanadium/glob-stream.js
+++ b/browser/vanadium/glob-stream.js
@@ -3,23 +3,37 @@
 // license that can be found in the LICENSE file.
 
 var assert = require('assert');
+var eos = require('end-of-stream');
+var error = require('./error');
+var format = require('util').format;
 var ms = require('ms');
+var once = require('once');
 var through = require('through2');
 
 module.exports = createStream;
 
-// Returns a flowing glob stream that emits vanadium names. If options.name is
-// present, the stream will recursively call glob until that name is discovered.
+// Returns a flowing glob stream that emits vanadium names, will recursively
+// glob until options.name is discovered.
 function createStream(options) {
   var found = false;
   var stream = through(write);
+  var runtime = options.runtime;
+  var pattern = options.pattern;
 
   assert.ok(options.name, 'options.name is required');
 
-  glob(options, stream, done);
+  _glob();
 
   return stream;
 
+  function _glob() {
+    var gs = glob(runtime, pattern);
+
+    eos(gs, end);
+    gs.pipe(stream);
+  }
+
+  // Track if options.name was found.
   function write(buffer, enc, callback) {
     if (options.name === buffer.toString()) {
       found = true;
@@ -28,56 +42,43 @@
     callback(null, buffer);
   }
 
-  function done(err) {
+  function end(err) {
     if (err) {
       stream.emit('error', err);
-      stream.end();
+      return;
     }
 
     if (found) {
       stream.end();
     } else {
-      glob(options, stream, done);
+      _glob(runtime, pattern);
     }
   }
 }
 
-// This function will do a single run of namespace.glob and proxy data events to
-// the passed in stream, the callback will be called when the stream has
-// finished.
-//
-// NOTE: recursive polling of the mountable eats about 10% of my CPU on a
-// macbook air.
-function glob(options, stream, done) {
-  assert.ok(options, 'options object is required');
-  assert.ok(options.runtime, 'options.runtime is required');
-  assert.ok(options.pattern, 'options.pattern is required');
-
-  var runtime = options.runtime;
+function glob(runtime, pattern) {
   var namespace = runtime.getNamespace();
-  var context = runtime.getContext().withTimeout(options.timeout || ms('12s'));
-  var promise = namespace.glob(context, options.pattern);
+  var context = runtime.getContext();
+  var ctx = context.withTimeout(ms('10s'));
+  var done = once(end);
+  var stream = through.obj(write);
 
-  promise.catch(function(err) {
-    throw err;
-  });
+  var gs = namespace.glob(ctx, pattern, done).stream;
 
-  // NOTE: The return value from .glob is a promise, to access the stream use
-  // the .stream attribute.
-  promise.stream.on('data', function onentry(entry) {
-    stream.write(entry.name);
-  });
+  eos(gs, done);
+  gs.pipe(stream);
 
-  // NOTE: Piping namespace.glob streams causes all kinds of errors which do not
-  // directly related to the stream being ended.
-  // TODO(jasoncampbell): This thing leaks random errors like crazy, this should
-  // get fixed so that errors are related and actionable and tools like
-  // end-of-stream can be used.
-  promise.stream.on('error', function onerror(err) {
-    throw err;
-    // maybe proxying errors wont be that bad if vanadium errors can be detected
-    // and split out...
-  });
+  return stream;
 
-  promise.stream.on('end', done);
+  function end(err) {
+    if (err) {
+      var message = format('Globbing "%s" failed', pattern);
+      stream.emit('error', error(err, message));
+    }
+  }
+
+  // Transform Glob entries to single names.
+  function write(entry, enc, cb) {
+    cb(null, entry.name);
+  }
 }
diff --git a/browser/vanadium/index.js b/browser/vanadium/index.js
index bbfb970..2f29511 100644
--- a/browser/vanadium/index.js
+++ b/browser/vanadium/index.js
@@ -3,31 +3,29 @@
 // license that can be found in the LICENSE file.
 
 var assert = require('assert');
-var BlobReader = require('readable-blob-stream');
 var debug = require('debug')('reader:vanadium');
+var eos = require('end-of-stream');
 var EventEmitter = require('events').EventEmitter;
-var filter = require('./filter-stream');
 var glob = require('./glob-stream');
 var inherits = require('inherits');
 var ms = require('ms');
 var once = require('once');
-var parallel = require('run-parallel');
+var prr = require('prr');
 var service = require('./service');
-var through = require('through2');
+var syncbase = require('./syncbase');
 var vanadium = require('vanadium');
 var waterfall = require('run-waterfall');
 var window = require('global/window');
-var syncbase = require('./syncbase');
-var prr = require('prr');
 
 module.exports = connect;
 
 function connect(options) {
   var client = new Client(options);
 
-  client.discover(function onmount(err, stream) {
+  client.mount(function onmount(err) {
     if (err) {
-      return options.error.set(err);
+      client.emit('error', err);
+      return;
     }
 
     debug('discovery is setup');
@@ -51,7 +49,6 @@
   client.name = '';
   client.mounted = false;
   client.runtime = {};
-  client.peers = options.state.peers;
   client.service = service(client);
 
   // TODO(jasoncampbell): Come up with a better way to couple the bare service
@@ -67,14 +64,13 @@
     var store = syncbase(options);
     prr(client, 'syncbase', store);
     client.emit('syncbase', store);
+    debug('runtime is available');
   });
 }
 
 inherits(Client, EventEmitter);
 
-Client.prototype.discover = function(callback) {
-  debug('initializing discovery');
-
+Client.prototype.mount = function(callback) {
   var client = this;
   var workers = [
     client.init.bind(client),
@@ -93,7 +89,6 @@
 };
 
 Client.prototype.init = function(callback) {
-  debug('calling init');
   var client = this;
 
   vanadium.init({
@@ -106,8 +101,6 @@
       return callback(err);
     }
 
-    debug('runtime is ready');
-
     // TODO(jasoncampbell): When this happens the window really, really needs to
     // be reloaded. In order to safely reload the page state should be stored or
     // serialized in a way that makes it recoverable for this error case.
@@ -123,8 +116,6 @@
     // <prefix>/reader/syncgroup
     var name = getName(runtime, client.id);
 
-    debug('name: %s', name);
-
     client.name = name;
 
     client.emit('runtime', runtime);
@@ -137,6 +128,7 @@
   var client = this;
   var service = client.service;
 
+  debug('serve: %s', client.name);
   runtime.newServer(client.name, service, onserve);
 
   function onserve(err, server) {
@@ -144,8 +136,6 @@
       return callback(err);
     }
 
-    debug('service is ready');
-
     window.addEventListener = window.addEventListener || noop;
     window.addEventListener('beforeunload', beforeunload);
 
@@ -165,14 +155,13 @@
   }
 };
 
-Client.prototype.glob = function(runtime, onmount) {
-  onmount = once(onmount);
+// Globs until mounted.
+Client.prototype.glob = function(runtime, callback) {
+  callback = once(callback);
 
   var client = this;
-  var peers = client.peers;
   // Glob pattern based on "<prefix>/reader/:id/app"
   var pattern = prefix(runtime).replace('/chrome', '') + '/reader/*/app';
-
   var stream = glob({
     name: client.name,
     runtime: runtime,
@@ -180,30 +169,37 @@
     timeout: ms('12s')
   });
 
-  stream.on('error', function(err) {
-    debug('glob-stream error: %s', err.stack);
-    client.emit('error', err);
-  });
+  // Glob stream ends once the mounted client.name has been discovered or there
+  // is an error.
+  //
+  // NOTE: Streaming names through service connection has been commented out,
+  // this will will be addressed later as the previous client/service model is
+  // unessecary given the way syncbase is being leveraged.
+  //
+  // TODO(jasoncampbell): Hook up a simple discovery mechanism to detect new
+  // peers and share information about constellation state like disconnects.
+  eos(stream, { readable: false }, callback);
 
-  stream
-  .pipe(filter(peers))
-  .pipe(through(write))
-  .on('error', function(err) {
-    debug('peer-stream error: %s', err.stack);
-    client.emit('error', err);
-  });
 
-  function write(buffer, enc, cb) {
-    var name = buffer.toString();
-
-    client.connect(name);
-
-    if (name === client.name) {
-      onmount();
-    }
-
-    cb(null, buffer);
-  }
+  // stream
+  // .pipe(filter(peers))
+  // .pipe(through(write))
+  // .on('error', function(err) {
+  //   debug('peer-stream error: %s', err.stack);
+  //   client.emit('error', err);
+  // });
+  //
+  // function write(buffer, enc, cb) {
+  //   var name = buffer.toString();
+  //
+  //   client.connect(name);
+  //
+  //   if (name === client.name) {
+  //     onmount();
+  //   }
+  //
+  //   cb(null, buffer);
+  // }
 };
 
 Client.prototype.connect = function(name) {
@@ -310,48 +306,6 @@
   return tasks;
 };
 
-Client.prototype.sendPDF = function(key, file, callback) {
-  var client = this;
-  var runtime = client.runtime;
-  var context = runtime.getContext();
-  var tasks = client.remotes('connected', createWorker);
-  var meta = {
-    hash: key,
-    name: file.name,
-    size: file.size,
-    type: file.type
-  };
-
-  // Execute tasks across peers in parallel.
-  parallel(tasks, function done(err, results) {
-    if (err) {
-      callback(err);
-      return;
-    }
-
-    callback(null, results);
-  });
-
-  function createWorker(peer) {
-    debug('created worker for remote %s', peer.uuid);
-    return worker;
-
-    function worker(callback) {
-      callback = once(callback);
-      debug('sending PDF (%d bytes) to %s', file.size, peer.uuid);
-
-      var promise = peer.remote.savePDF(context, meta, callback);
-      var stream = promise.stream;
-      var bs = new BlobReader(file);
-
-      bs.on('error', callback);
-      stream.on('end', callback);
-
-      bs.pipe(stream);
-    }
-  }
-};
-
 // TODO(jasoncampbell): Move naming related code into a separate module.
 function getName(runtime, id) {
   var p = prefix(runtime).replace('/chrome', '');
diff --git a/browser/vanadium/syncbase/db.js b/browser/vanadium/syncbase/db.js
new file mode 100644
index 0000000..3b95e17
--- /dev/null
+++ b/browser/vanadium/syncbase/db.js
@@ -0,0 +1,62 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+var ms = require('ms');
+var pass = require('./pass');
+var syncbase = require('syncbase');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+var waterfall = require('run-waterfall');
+
+module.exports = create;
+
+// Creates the syncbase structure up to the databse level. The database will be
+// mounted as: <name>/reader/db.
+function create(context, name, done) {
+  var tasks = [
+    pass(context, name),
+    createApp,
+    createDB
+  ];
+
+  waterfall(tasks, done);
+}
+
+function createApp(context, name, callback) {
+  var service = syncbase.newService(name);
+  var app = service.app('reader');
+  var ctx = context.withTimeout(ms('5s'));
+  var permissions = {};
+
+  app.create(ctx, permissions, onapp);
+
+  function onapp(err) {
+    if (err && !(err instanceof verror.ExistError)) {
+      var template = 'syncbase - app.create(...) failed\n * name: %s';
+      var message = format(template, name);
+      error(err, message, callback);
+    } else {
+      callback(null, context, app);
+    }
+  }
+}
+
+function createDB(context, app, callback) {
+  var db = app.noSqlDatabase('db');
+  var ctx = context.withTimeout(ms('5s'));
+  var permissions = {};
+
+  db.create(ctx, permissions, ondb);
+
+  function ondb(err) {
+    if (err && !(err instanceof verror.ExistError)) {
+      var message = format('syncbase - db.create(...) failed');
+      error(err, message, callback);
+    } else {
+      callback(null, db);
+    }
+  }
+}
diff --git a/browser/vanadium/syncbase/get.js b/browser/vanadium/syncbase/get.js
new file mode 100644
index 0000000..53c11c3
--- /dev/null
+++ b/browser/vanadium/syncbase/get.js
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var ms = require('ms');
+var parse = require('./parse');
+var pass = require('./pass');
+var waterfall = require('run-waterfall');
+
+module.exports = getData;
+
+function getData(context, table, key, callback) {
+  var tasks = [
+    pass(context, table, key),
+    get,
+    parse
+  ];
+
+  waterfall(tasks, callback);
+}
+
+function get(context, table, key, callback) {
+  var ctx = context.withTimeout(ms('5s'));
+
+  table.get(ctx, key, onGet);
+
+  function onGet(err, string) {
+    if (err) {
+      callback(err);
+      return;
+    }
+
+    callback(null, string);
+  }
+}
diff --git a/browser/vanadium/syncbase/index.js b/browser/vanadium/syncbase/index.js
index bc31e0a..d717111 100644
--- a/browser/vanadium/syncbase/index.js
+++ b/browser/vanadium/syncbase/index.js
@@ -4,17 +4,23 @@
 
 var assert = require('assert');
 var BlobReader = require('readable-blob-stream');
+var db = require('./db');
 var debug = require('debug')('reader:syncbase');
+var dz = require('dezalgo');
 var eos = require('end-of-stream');
+var error = require('../error');
 var EventEmitter = require('events').EventEmitter;
 var extend = require('xtend');
+var format = require('util').format;
+var get = require('./get');
 var inherits = require('inherits');
-var json = require('./json');
 var ms = require('ms');
 var once = require('once');
+var parse = require('./parse');
 var prr = require('prr');
-var setup = require('./setup-db');
+var put = require('./put');
 var syncbase = require('syncbase');
+var table = require('./table');
 var through = require('through2');
 var util = require('../util');
 var vanadium = require('vanadium');
@@ -23,8 +29,14 @@
 
 module.exports = Store;
 
-// Naming for the DB setup looks like <app-name>/<db-name>/<table-name> and
-// <db-name> is hard coded to "db".
+// Naming for the Syncbase setup looks like <app-name>/<db>/<table>, these
+// values are hard-coded in ./db.js and result in serices being mounted with the
+// following structure:
+//
+//    <name>/reader/db/<table>
+//
+// The app, database, table setup is lazily evaluated and will create missing
+// components in the corresponding syncbase if they are missing.
 function Store(options) {
   if (!(this instanceof Store)) {
     return new Store(options);
@@ -39,76 +51,97 @@
 
   prr(store, 'runtime', options.runtime);
   prr(store, 'name', options.name, 'e');
-  prr(store, 'tables', {}, 'e');
+  prr(store, '_tables', {});
+  prr(store, '_db', {}, 'c');
+
+  store._status = 'new';
+  store.on('status', onstatus);
+
+  function onstatus(status) {
+    debug('status: %s', status);
+    store._status = status;
+  }
 }
 
 inherits(Store, EventEmitter);
 
-Store.prototype.db = function(callback) {
+Store.prototype.status = function(status) {
   var store = this;
-  var runtime = store.runtime;
-  var name = store.name;
-
-  if (store._db) {
-    return process.nextTick(function next() {
-      callback(null, store._db);
-    });
-  }
-
-  setup(runtime, name, function onsetup(err, db) {
-    if (err) {
-      callback(err);
-      return;
-    }
-
-    // NOTE: It's possible for this async call to happen in paralell and a
-    // previos callback to have fired already. If that is the case do not re-set
-    // `store._db`.
-    // TODO: Wrap calls with some logic where pending callbacks can
-    // be triggered without making multiple `setup` calls.
-    if (! store._db) {
-      prr(store, '_db', db);
-    }
-
-    callback(null, db);
-  });
+  return store._status === status;
 };
 
-Store.prototype.table = function(name, callback) {
+Store.prototype.db = function(callback) {
+  callback = dz(callback);
+
   var store = this;
 
-  if (!!store.tables[name]) {
-    return process.nextTick(function next() {
-      callback(null, store.tables[name]);
-    });
+  if (store.status('ready')) {
+    callback(null, store._db);
+    return;
   }
 
-  store.db(function ondb(err, db) {
+  // In case a previous call is in process it's callback will handle the error
+  // case, but current calls will need to be queued. If the in-process call is
+  // successful queued callbacks can be called via the "db" event.
+  if (store.status('initializing')) {
+    store.once('db', callback.bind(null, null));
+    return;
+  }
+
+  store.emit('status', 'initializing');
+
+  var context = store.runtime.getContext();
+  var name = store.name;
+  db(context, name, ondb);
+
+  function ondb(err, db) {
     if (err) {
       callback(err);
       return;
     }
 
-    var table = db.table(name);
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
-    var permissions = {};
+    prr(store, '_db', db);
+    store.emit('db', db);
+    store.emit('status', 'ready');
+    callback(null, db);
+  }
+};
 
-    table.create(context, permissions, function ontable(err) {
-      if (err && !(err instanceof verror.ExistError)) {
-        callback(err);
-        return;
-      }
+Store.prototype.table = function(keyspace, callback) {
+  callback = dz(callback);
 
-      store.tables[name] = table;
+  var store = this;
+  var cached = store._tables[keyspace];
 
-      callback(null, table);
-    });
-  });
+  if (cached) {
+    callback(null, cached);
+    return;
+  }
+
+  store.db(ondb);
+
+  function ondb(err, db) {
+    if (err) {
+      callback(err);
+      return;
+    }
+
+    var context = store.runtime.getContext();
+    table(context, db, keyspace, ontable);
+  }
+
+  function ontable(err, table) {
+    if (err) {
+      callback(err);
+      return;
+    }
+
+    store._tables[keyspace] = table;
+    callback(null, table);
+  }
 };
 
 Store.prototype.get = function(keyspace, key, callback) {
-  debug('#get("%s", "%s", callback)', keyspace, key);
   var store = this;
 
   store.table(keyspace, ontable);
@@ -119,27 +152,27 @@
       return;
     }
 
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
-
-    table.get(context, key, onget);
+    var context = store.runtime.getContext();
+    get(context, table, key, onget);
   }
 
-  function onget(err, string) {
+  function onget(err, data) {
     if (err) {
-      callback(err);
+      var template = 'syncbase #get("%s", "%s", callback) failed';
+      var message = format(template, keyspace, key);
+      error(err, message, callback);
       return;
     }
 
-    json.decode(string, callback);
+    callback(null, data);
   }
 };
 
-Store.prototype.put = function(keyspace, value, callback) {
-  debug('#put("%s", %o, callback)', keyspace, value);
-  var store = this;
+Store.prototype.put = function(keyspace, data, callback) {
+  assert.ok(data.id, 'data.id is required.');
 
-  assert.ok(value.id, 'item.id is required.');
+  var store = this;
+  var context = store.runtime.getContext();
 
   store.table(keyspace, ontable);
 
@@ -148,58 +181,39 @@
       return callback(err);
     }
 
-    if (keyspace === 'files' && !value.ref) {
-      assert.ok(value.blob, 'item.blob is required.');
-      store.putBlob(value.blob, onref);
+    if (keyspace === 'files' && !data.ref) {
+      store.putBlob(data.blob, onref);
     } else {
-      put(table, value, callback);
+      put(context, table, data, callback);
     }
 
+    return;
+
     function onref(err, ref) {
       if (err) {
         return callback(err);
       }
 
-      value.ref = ref;
-      put(table, value, callback);
-    }
-
-    function put(table, value) {
-      var key = value.id;
-      var runtime = store.runtime;
-      var context = util.timeout(runtime, ms('5s'));
-
-      json.encode(value, function onjson(err, string){
-        table.put(context, key, string, onput);
-      });
-
-      function onput(err) {
-        if (err) {
-          return callback(err);
-        }
-
-        callback(null, value);
-      }
+      data.ref = ref;
+      put(context, table, data, callback);
     }
   }
 };
 
 Store.prototype.del = function(keyspace, key, callback) {
-  debug('#del("%s", "%s", callback)', keyspace, key);
   var store = this;
 
   store.table(keyspace, ontable);
 
   function ontable(err, table) {
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
+    var context = store.runtime.getContext();
+    var ctx = context.withTimeout(ms('5s'));
 
-    table.delete(context, key, callback);
+    table.delete(ctx, key, callback);
   }
 };
 
 Store.prototype.putBlob = function(blob, callback) {
-  debug('#putBlob(%o, callback)', blob);
   assert.ok(blob instanceof window.Blob, 'Must use a Blob object.');
 
   var store = this;
@@ -211,10 +225,9 @@
       return callback(err);
     }
 
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
-
-    db.createBlob(context, onblob);
+    var context = store.runtime.getContext();
+    var ctx = context.withTimeout(ms('5s'));
+    db.createBlob(ctx, onblob);
   }
 
   function onblob(err, vblob) {
@@ -224,10 +237,10 @@
     }
 
     var reader = new BlobReader(blob);
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
+    var context = store.runtime.getContext();
+    var ctx = context.withTimeout(ms('5s'));
     var done = once(onput);
-    var writer = vblob.put(context, done);
+    var writer = vblob.put(ctx, done);
 
     eos(reader, done);
     eos(writer, { readable: false }, done);
@@ -240,10 +253,9 @@
         return;
       }
 
-      var runtime = store.runtime;
-      var context = util.timeout(runtime, ms('5s'));
-
-      vblob.commit(context, function oncommit(err) {
+      var context = store.runtime.getContext();
+      var ctx = context.withTimeout(ms('5s'));
+      vblob.commit(ctx, function oncommit(err) {
         if (err) {
           callback(err);
           return;
@@ -274,11 +286,11 @@
       return;
     }
 
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
     var range = syncbase.nosql.rowrange.range(options.start, options.limit);
     var done = once(end);
-    var reader = table.scan(context, range, done);
+    var context = store.runtime.getContext();
+    var ctx = context.withTimeout(ms('5s'));
+    var reader = table.scan(ctx, range, done);
 
     eos(reader, done);
     reader.pipe(stream);
@@ -290,15 +302,17 @@
       key: chunk.key
     };
 
-    json.decode(chunk.value, function onvalue(err, value) {
+    parse(chunk.value, onparse);
+
+    function onparse(err, data) {
       if (err) {
         callback(err);
         return;
       }
 
-      item.value = value;
+      item.value = data;
       callback(null, item);
-    });
+    }
   }
 
   function end(err) {
@@ -323,10 +337,9 @@
       return;
     }
 
-    var runtime = store.runtime;
-    var context = util.timeout(runtime, ms('5s'));
-
-    db.getResumeMarker(context, onmarker);
+    var context = store.runtime.getContext();
+    var ctx = context.withTimeout(ms('5s'));
+    db.getResumeMarker(ctx, onmarker);
 
     function onmarker(err, marker) {
       if (err) {
@@ -334,11 +347,11 @@
         return;
       }
 
-      var runtime = store.runtime;
-      var context = util.cancel(runtime);
       var prefix = '';
       var done = once(end);
-      var reader = db.watch(context, keyspace, prefix, marker, done);
+      var context = store.runtime.getContext();
+      var ctx = context.withCancel();
+      var reader = db.watch(ctx, keyspace, prefix, marker, done);
 
       eos(reader, reader);
       reader.pipe(stream);
@@ -352,6 +365,8 @@
       return;
     }
 
+    debug('change: %o', chunk);
+
     var change = {
       type: chunk.changeType,
       key: chunk.rowName,
@@ -373,8 +388,9 @@
         return;
       }
 
+      debug('get success: %o', value);
       change.value = value;
-      callback(null, value);
+      callback(null, change);
     }
   }
 
@@ -385,3 +401,103 @@
     }
   }
 };
+
+Store.prototype.sync = function(callback) {
+  var _db;
+
+  var store = this;
+  var username = util.parseName(store.name).username;
+  var context = store.runtime.getContext();
+  // The Vanadium name of the running syncbase instance that hosts the sync
+  // group.
+  var name = 'users/' + username + '/reader/cloudsync';
+  var syncname = [
+    name,
+    '%%sync', // Syncbase naming scheme.
+    'cloudsync' // Suffix.
+  ].join('/');
+  // TODO(jasoncampbell): Find docs or something about what this is and what it
+  // does.
+  var info = new syncbase.nosql.SyncGroupMemberInfo({
+    syncPriority: 8
+  });
+
+  db(context, name, ondb);
+
+  function ondb(err, db) {
+    if (err) {
+      callback(err);
+      return;
+    }
+
+    _db = db;
+    table(context, db, 'files', ontable);
+  }
+
+  function ontable(err, table) {
+    if (err) {
+      callback(err);
+      return;
+    }
+
+    var permissions = new window.Map([
+          [ 'Admin',   { 'in': [ '...' ] } ],
+          [ 'Read',    { 'in': [ '...' ] } ],
+          [ 'Write',   { 'in': [ '...' ] } ],
+          [ 'Resolve', { 'in': [ '...' ] } ],
+          [ 'Debug',   { 'in': [ '...' ] } ]
+        ]);
+
+    var spec = new syncbase.nosql.SyncGroupSpec({
+      description: 'reader syncgroup ',
+      perms: permissions,
+      // Prefixes are structured by <table/keyspace>:<key-pattern> where
+      // <key-pattern> matches keys (rows in syncbase vernacular). The rows/keys
+      // are mounted as <name>/reader/db/<table>/<key/row> so the prefixes can
+      // be thought of as a vanadium namespace glob on the last two names with a
+      // different separator.
+      prefixes: [ 'files:c' ],
+      // mountTables: [ ... ] - actually a rendezvous point that is
+      // permissable to mount to by the syncbase instance hosting the sync
+      // group.
+      //
+      // Note this name NEEDS to be rooted.
+      mountTables: [
+        '/ns.dev.v.io:8101/users/' + username + '/reader/rendezvous'
+      ]
+    });
+
+    var ctx = context.withTimeout(ms('5s'));
+    var group = _db.syncGroup(syncname);
+    group.create(ctx, spec, info, oncreate);
+  }
+
+  function oncreate(err) {
+    if (err && !(err instanceof verror.ExistError)) {
+      return callback(err);
+    }
+
+    debug('remote syncbase configured!');
+
+    // Now setup local syncbase to join the remote syncgroup.
+    store.db(function(err, db) {
+      if (err) {
+        return callback(err);
+      }
+
+      var ctx = context.withTimeout(ms('5s'));
+      var group = db.syncGroup(syncname);
+
+      debug('joining syncgroup: %s', syncname);
+      group.join(ctx, info, onjoin);
+    });
+
+    function onjoin(err) {
+      if (err && !(err instanceof verror.ExistError)) {
+        return callback(err);
+      }
+
+      callback(null);
+    }
+  }
+};
diff --git a/browser/vanadium/syncbase/json.js b/browser/vanadium/syncbase/json.js
deleted file mode 100644
index aca175d..0000000
--- a/browser/vanadium/syncbase/json.js
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var VError = require('verror');
-
-module.exports = {
-  encode: encode,
-  decode: decode
-};
-
-function encode(data, callback) {
-  var string = '';
-
-  try {
-    string = JSON.stringify(data);
-  } catch (e) {
-    var err = new VError(e, 'Failed to encode %o', data);
-    return callback(err);
-  }
-
-  callback(null, string);
-}
-
-function decode(string, callback) {
-  var value;
-
-  try {
-     value = JSON.parse(string);
-  } catch (e) {
-    var err = new VError(e, 'Failed to decode "%s"', string);
-    return callback(err);
-  }
-
-  callback(null, value);
-}
diff --git a/browser/vanadium/syncbase/parse.js b/browser/vanadium/syncbase/parse.js
new file mode 100644
index 0000000..b33fc02
--- /dev/null
+++ b/browser/vanadium/syncbase/parse.js
@@ -0,0 +1,22 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+
+module.exports = parse;
+
+function parse(string, callback) {
+  var value;
+
+  try {
+     value = JSON.parse(string);
+  } catch (e) {
+    var message = format('Failed to decode "%s"', string);
+    error(e, message, callback);
+    return;
+  }
+
+  callback(null, value);
+}
diff --git a/browser/vanadium/syncbase/pass.js b/browser/vanadium/syncbase/pass.js
new file mode 100644
index 0000000..ce08f42
--- /dev/null
+++ b/browser/vanadium/syncbase/pass.js
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = pass;
+
+// Creates a worker that simply passes arguments to the next worker in a
+// waterfall series. This helps avoid anti-patterns like creating multiple
+// functions within the calling  closure/scope or excessive use of
+// `fn.bind(...)`.
+//
+// The worker's callback is fired with the error argument set to null.
+//
+// This helper function is meant to be used with the run-waterfall module as the
+// first task.
+//
+// SEE: https://goo.gl/pOSb30
+function pass() {
+  var args = Array.prototype.slice.call(arguments);
+  args.unshift(null);
+  return worker;
+
+  function worker(callback) {
+    callback.apply(null, args);
+  }
+}
diff --git a/browser/vanadium/syncbase/put.js b/browser/vanadium/syncbase/put.js
new file mode 100644
index 0000000..156ef37
--- /dev/null
+++ b/browser/vanadium/syncbase/put.js
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var debug = require('debug')('reader:syncbase:put');
+var equal = require('deep-equal');
+var error = require('../error');
+var format = require('util').format;
+var get = require('./get');
+var ms = require('ms');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+
+var options = { strict: true };
+
+module.exports = put;
+
+function put(context, table, data, callback) {
+  get(context, table, data.id, onGet);
+
+  function onGet(err, old) {
+    if (err && !(err instanceof verror.NoExistError)) {
+      callback(err);
+      return;
+    }
+
+    // Skip put calls to syncbase if the content is the same as what exisits.
+    // This prevents recursion on put, watch, state update cycles over p2p sync.
+    if (equal(old, data, options)) {
+      debug('skipping put, old value is the same');
+      callback(null, data);
+      return;
+    }
+
+    var string = '';
+
+    try {
+      string = JSON.stringify(data);
+    } catch (e) {
+      var message = format('Failed to encode "%s"', data);
+      error(e, message, callback);
+      return;
+    }
+
+    var ctx = context.withTimeout(ms('5s'));
+    table.put(ctx, data.id, string, onPut);
+  }
+
+  function onPut(err) {
+    if (err) {
+      var template = 'table: "%s" .put(ctx, "%s", ...) failed';
+      var message = format(template, table.name, data.id);
+      error(err, message, callback);
+      return;
+    }
+
+    callback(null, data);
+  }
+}
diff --git a/browser/vanadium/syncbase/setup-db.js b/browser/vanadium/syncbase/setup-db.js
deleted file mode 100644
index 76d3b9b..0000000
--- a/browser/vanadium/syncbase/setup-db.js
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var waterfall = require('run-waterfall');
-var vanadium = require('vanadium');
-var verror = vanadium.verror;
-var util = require('../util');
-var syncbase = require('syncbase');
-var util = require('../util');
-var ms = require('ms');
-
-module.exports = setup;
-
-function setup(runtime, name, done) {
-  var tasks = [
-    createApp,
-    createDB
-  ];
-
-  waterfall(tasks, done);
-
-  function createApp(callback) {
-    var service = syncbase.newService(name);
-    var appName = util.parseName(name).app;
-    var app = service.app(appName);
-    var context = util.timeout(runtime, ms('5s'));
-    var permissions = {};
-
-    app.create(context, permissions, function onapp(err) {
-      if (err && !(err instanceof verror.ExistError)) {
-        callback(err);
-      } else {
-        callback(null, app);
-      }
-    });
-  }
-
-  function createDB(app, callback) {
-    var db = app.noSqlDatabase('db');
-    var context = util.timeout(runtime, ms('5s'));
-    var permissions = {};
-
-    db.create(context, permissions, function ondb(err) {
-      if (err && !(err instanceof verror.ExistError)) {
-        callback(err);
-      } else {
-        callback(null, db);
-      }
-    });
-  }
-}
diff --git a/browser/vanadium/syncbase/table.js b/browser/vanadium/syncbase/table.js
new file mode 100644
index 0000000..3588958
--- /dev/null
+++ b/browser/vanadium/syncbase/table.js
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+var ms = require('ms');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+
+module.exports = createOrGetTable;
+
+function createOrGetTable(context, db, keyspace, callback) {
+  var table = db.table(keyspace);
+  var ctx = context.withTimeout(ms('5s'));
+  var permissions = {};
+
+  table.create(ctx, permissions, onTableCreate);
+
+  function onTableCreate(err) {
+    if (err && !(err instanceof verror.ExistError)) {
+      var template = 'syncbase - db.table("%s").create(...) failed';
+      var message = format(template, keyspace);
+      error(err, message, callback);
+      return;
+    }
+
+    callback(null, table);
+  }
+}
diff --git a/browser/vanadium/util.js b/browser/vanadium/util.js
index d6a981c..f4b54e2 100644
--- a/browser/vanadium/util.js
+++ b/browser/vanadium/util.js
@@ -5,22 +5,9 @@
 var pathToRegexp = require('path-to-regexp');
 
 module.exports = {
-  timeout: timeout,
-  cancel: cancel,
   parseName: parseName
 };
 
-function timeout(runtime, miliseconds) {
-  var context = runtime.getContext();
-  return context.withTimeout(miliseconds || 5000);
-}
-
-function cancel(runtime) {
-  var context = runtime.getContext();
-  return context.withCancel();
-}
-
-
 function parseName(name) {
   var keys = [];
   var re = pathToRegexp('users/:username/:app/:id/:suffix', keys);
diff --git a/package.json b/package.json
index e72249d..30f2f27 100644
--- a/package.json
+++ b/package.json
@@ -33,6 +33,8 @@
     "browserify": "^11.0.1",
     "cuid": "^1.3.8",
     "debug": "^2.2.0",
+    "deep-equal": "^1.0.1",
+    "dezalgo": "^1.0.3",
     "domready": "^1.0.8",
     "end-of-stream": "^1.1.0",
     "extend": "^2.0.1",
@@ -47,6 +49,7 @@
     "once": "^1.3.2",
     "path-to-regexp": "^1.2.1",
     "prr": "^1.0.1",
+    "qs": "^5.1.0",
     "readable-blob-stream": "^1.0.0",
     "routes": "^2.1.0",
     "run-parallel": "^1.1.2",
@@ -58,7 +61,6 @@
     "thunky": "^0.1.0",
     "uuid": "^2.0.1",
     "vanadium": "*",
-    "verror": "^1.6.0",
     "xtend": "^4.0.0"
   }
 }