reader: add support for sync groups.
Several changes to support adding sync groups have been made to
`browser/vanadium/syncbase`. Raw Syncbase API calls have been broken out into
separate, reusable modules for accomplishing simple tasks like getting a
database or a table. All `syncbased` instances are started with very permissive
ACLs to get a minimum working example of sync between peers (SEE: #19). Code has
been added to support watching for changes to originate from sync. Peer
discovery has been temporarily removed since all previous data sharing now goes
through Syncbase.
The browser/vanadium/glob-stream module was refactored due to a serious bug
where failing RPC requests were not being handled correctly. This caused a
recursive loop of uncaught exceptions.
Some additional, smaller changes in this CL.
* Old, non-syncbase storage and transfer code has been removed.
* Added a quick way to change an application id via query param.
* Added a generic error wrapper to add more context to Vanadium RPC errors.
Closes #16
Change-Id: I6d45790de7551386bfcbc6c5ec3b93ef47066fc2
diff --git a/Makefile b/Makefile
index 1979a27..6095f1c 100644
--- a/Makefile
+++ b/Makefile
@@ -13,6 +13,7 @@
host ?= 127.0.0.1
port ?= 8080
syncbase_port ?= 4000
+cloudsync_port ?= 8000
id ?= $(shell if test -e tmp/id; then cat tmp/id; else bin/cuid; fi)
all: public/bundle.js node_modules
@@ -39,7 +40,7 @@
$< 1> $@
.PHONY:
-distclean:
+distclean: clean
@$(RM) -fr node_modules
@v23 goext distclean
@@ -48,8 +49,9 @@
@$(RM) -fr npm-debug.log
@$(RM) -fr public/bundle.js
@$(RM) -fr tmp
- @$(RM) -fr bin/syncbased
- @$(RM) -fr bin/principal
+ @$(RM) -fr credentials
+ @$(RM) -f bin/principal
+ @$(RM) -f bin/syncbased
.PHONY:
lint: node_modules
@@ -100,4 +102,19 @@
--v23.proxy="/ns.dev.v.io:8101/proxy" \
--v23.tcp.address=":$(syncbase_port)" \
--v23.credentials="credentials" \
- --v23.permissions.literal='{"Admin":{"In":["$(blessing)"]},"Write":{"In":["$(blessing)"]},"Read":{"In":["$(blessing)"]},"Resolve":{"In":["$(blessing)"]},"Debug":{"In":["$(blessing)"]}}'
+ --v23.permissions.literal='{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}'
+
+.PHONY:
+cloudsync: bin/syncbased credentials tmp
+ $(eval blessing := $(shell bin/principal dump --v23.credentials=./credentials -s=true))
+ $(eval email := $(subst dev.v.io/u/,,$(blessing)))
+ ./bin/syncbased \
+ --v=5 \
+ --alsologtostderr=false \
+ --root-dir="tmp/cloudsync" \
+ --name="users/$(email)/reader/cloudsync" \
+ --v23.namespace.root="/ns.dev.v.io:8101" \
+ --v23.proxy="/ns.dev.v.io:8101/proxy" \
+ --v23.tcp.address=":$(cloudsync_port)" \
+ --v23.credentials="credentials" \
+ --v23.permissions.literal='{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}'
diff --git a/README.md b/README.md
index b090509..7b8b937 100644
--- a/README.md
+++ b/README.md
@@ -41,6 +41,28 @@
make syncbase
-This will automatically have you set up credentials etc. If you want to remove stored data & credentials use:
+Run the syncbase instance which hosts the syncgroup:
+
+ make cloudsync
+
+This will run a syncbased instance that will mount as
+"users/<email>/reader/cloudsync" and host the syncgroup. In order for peers to
+sync an instance of this needs to be running somewhere.
+
+To run a new syncbase peer and corresponding application use variables to change
+the startup settings:
+
+ syncbase_port=8888 id=`./bin/cuid` make syncbase
+
+This will generate a new client id and start a new syncbased instance on a
+different port. The generated id can be grabbed from the standard out and will
+look something like this "cif7en1kb00007uigyohv58tx". Once you have the id you
+can open a new browser window and use the id in a query param to initialize the
+application to connect as that peer.
+
+ http://127.0.0.1:8080/?id=<id>
+
+This will automatically have you set up credentials etc. If you want to remove
+stored data & credentials use:
make clean
diff --git a/browser/components/constellation/state.js b/browser/components/constellation/state.js
index f6f6373..aa6f13b 100644
--- a/browser/components/constellation/state.js
+++ b/browser/components/constellation/state.js
@@ -2,13 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-var debug = require('debug')('reader:constellation:state');
var hg = require('mercury');
// Create state for the constellation component
module.exports = function create(options) {
- debug('initialize state');
-
var state = hg.state({
peers: hg.varhash({}, peer),
error: hg.value(null),
diff --git a/browser/components/pdf/state.js b/browser/components/pdf/state.js
index 73dc260..117971c 100644
--- a/browser/components/pdf/state.js
+++ b/browser/components/pdf/state.js
@@ -9,8 +9,6 @@
module.exports = create;
function create(options) {
- debug('creating PDF state: %o', options);
-
var state = hg.state({
error: hg.value(null),
pdf: hg.value(null),
diff --git a/browser/hash-object.js b/browser/hash-object.js
deleted file mode 100644
index e1d0c72..0000000
--- a/browser/hash-object.js
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var sha256d = require('sha256d');
-
-module.exports = hash;
-
-function hash(data) {
- var h = sha256d();
- var string = JSON.stringify(data);
-
- h.update(string);
-
- return h.digest('hex');
-}
diff --git a/browser/main.js b/browser/main.js
index 95c401a..d135436 100644
--- a/browser/main.js
+++ b/browser/main.js
@@ -21,8 +21,13 @@
var routes = require('./routes');
var vanadium = require('./vanadium');
var window = require('global/window');
+var qs = require('qs');
+var url = require('url');
+// Expose globals for debugging.
window.debug = require('debug');
+window.require = require;
+global.require = require;
domready(function ondomready() {
debug('domready');
@@ -38,30 +43,53 @@
// TODO(jasoncampbell): add an error component for aggregating, logging, and
// displaying errors in the UI.
- state.error(function(err) {
+ state.error(function onstateerror(err) {
if (!err) {
return;
}
- console.error('TODO: add an error component');
+ console.error('TODO(jasoncampbell): add an error component');
console.error(err.stack);
+
+ for (var key in err) {
+ if (err.hasOwnProperty(key)) {
+ console.error('* %s => %o', key, err[key]);
+ }
+ }
});
router(state, routes).on('notfound', notfound);
+ // Quick way to change the id of the running application using a query param.
+ // TODO(jasoncampbell): Create a configuration screen/component.
+ var query = url.parse(window.location.href).query;
+ var id = qs.parse(query).id || process.env.ID;
+
+ debug('##### %s #####', id);
+
// The vanadium client is coupled to the application state here so that async
// code paths in the ./vanadium modules can be isolated to the application
// initialization. This allows components to be separately tested/interacted
// with as mappings between data and UI without being tangled into the
// local vanadium discovery process.
- var client = vanadium({
- id: process.env.ID,
- state: state.constellation
+ var client = vanadium({ id: id });
+
+ client.on('error', function onvanadiumerror(err) {
+ state.error.set(err);
});
client.on('syncbase', function onsyncbase(store) {
state.store.set(store);
+ store.sync(function onsync(err) {
+ if (err) {
+ state.error.set(err);
+ return;
+ }
+
+ debug('store.sync succeeded!');
+ });
+
// Setup watch.
var ws = store.createWatchStream('files');
@@ -71,14 +99,23 @@
}
});
- ws.on('data', function(change) {
+ ws.on('data', function onwatchchange(change) {
debug('watch stream change: %o', change);
+ // NOTE: this triggers a recursion between clients :(
+
+ if (change.type === 'put') {
+ state.files.collection.put(change.key, change.value);
+ }
+
+ if (change.type === 'delete') {
+ state.files.collection.delete(change.key);
+ }
});
// Scan all keys and populate state.
var stream = store.createReadStream('files');
- stream.on('data', function(data) {
+ stream.on('data', function onreadstreamdata(data) {
state.files.collection.put(data.key, data.value);
});
@@ -113,8 +150,6 @@
});
each(collection, function(key, value) {
- debug('each iterator: %o', value);
-
store.put('files', value, function callback(err, file) {
if (err) {
state.error.set(err);
diff --git a/browser/vanadium/error.js b/browser/vanadium/error.js
new file mode 100644
index 0000000..50619fd
--- /dev/null
+++ b/browser/vanadium/error.js
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = error;
+
+function error(err, message, callback) {
+ var werr = new Error(message);
+ werr.cause = err;
+
+ if (callback) {
+ callback(werr);
+ return;
+ }
+
+ return werr;
+}
diff --git a/browser/vanadium/glob-stream.js b/browser/vanadium/glob-stream.js
index 68e220a..6719ab0 100644
--- a/browser/vanadium/glob-stream.js
+++ b/browser/vanadium/glob-stream.js
@@ -3,23 +3,37 @@
// license that can be found in the LICENSE file.
var assert = require('assert');
+var eos = require('end-of-stream');
+var error = require('./error');
+var format = require('util').format;
var ms = require('ms');
+var once = require('once');
var through = require('through2');
module.exports = createStream;
-// Returns a flowing glob stream that emits vanadium names. If options.name is
-// present, the stream will recursively call glob until that name is discovered.
+// Returns a flowing glob stream that emits vanadium names, will recursively
+// glob until options.name is discovered.
function createStream(options) {
var found = false;
var stream = through(write);
+ var runtime = options.runtime;
+ var pattern = options.pattern;
assert.ok(options.name, 'options.name is required');
- glob(options, stream, done);
+ _glob();
return stream;
+ function _glob() {
+ var gs = glob(runtime, pattern);
+
+ eos(gs, end);
+ gs.pipe(stream);
+ }
+
+ // Track if options.name was found.
function write(buffer, enc, callback) {
if (options.name === buffer.toString()) {
found = true;
@@ -28,56 +42,43 @@
callback(null, buffer);
}
- function done(err) {
+ function end(err) {
if (err) {
stream.emit('error', err);
- stream.end();
+ return;
}
if (found) {
stream.end();
} else {
- glob(options, stream, done);
+ _glob(runtime, pattern);
}
}
}
-// This function will do a single run of namespace.glob and proxy data events to
-// the passed in stream, the callback will be called when the stream has
-// finished.
-//
-// NOTE: recursive polling of the mountable eats about 10% of my CPU on a
-// macbook air.
-function glob(options, stream, done) {
- assert.ok(options, 'options object is required');
- assert.ok(options.runtime, 'options.runtime is required');
- assert.ok(options.pattern, 'options.pattern is required');
-
- var runtime = options.runtime;
+function glob(runtime, pattern) {
var namespace = runtime.getNamespace();
- var context = runtime.getContext().withTimeout(options.timeout || ms('12s'));
- var promise = namespace.glob(context, options.pattern);
+ var context = runtime.getContext();
+ var ctx = context.withTimeout(ms('10s'));
+ var done = once(end);
+ var stream = through.obj(write);
- promise.catch(function(err) {
- throw err;
- });
+ var gs = namespace.glob(ctx, pattern, done).stream;
- // NOTE: The return value from .glob is a promise, to access the stream use
- // the .stream attribute.
- promise.stream.on('data', function onentry(entry) {
- stream.write(entry.name);
- });
+ eos(gs, done);
+ gs.pipe(stream);
- // NOTE: Piping namespace.glob streams causes all kinds of errors which do not
- // directly related to the stream being ended.
- // TODO(jasoncampbell): This thing leaks random errors like crazy, this should
- // get fixed so that errors are related and actionable and tools like
- // end-of-stream can be used.
- promise.stream.on('error', function onerror(err) {
- throw err;
- // maybe proxying errors wont be that bad if vanadium errors can be detected
- // and split out...
- });
+ return stream;
- promise.stream.on('end', done);
+ function end(err) {
+ if (err) {
+ var message = format('Globbing "%s" failed', pattern);
+ stream.emit('error', error(err, message));
+ }
+ }
+
+ // Transform Glob entries to single names.
+ function write(entry, enc, cb) {
+ cb(null, entry.name);
+ }
}
diff --git a/browser/vanadium/index.js b/browser/vanadium/index.js
index bbfb970..2f29511 100644
--- a/browser/vanadium/index.js
+++ b/browser/vanadium/index.js
@@ -3,31 +3,29 @@
// license that can be found in the LICENSE file.
var assert = require('assert');
-var BlobReader = require('readable-blob-stream');
var debug = require('debug')('reader:vanadium');
+var eos = require('end-of-stream');
var EventEmitter = require('events').EventEmitter;
-var filter = require('./filter-stream');
var glob = require('./glob-stream');
var inherits = require('inherits');
var ms = require('ms');
var once = require('once');
-var parallel = require('run-parallel');
+var prr = require('prr');
var service = require('./service');
-var through = require('through2');
+var syncbase = require('./syncbase');
var vanadium = require('vanadium');
var waterfall = require('run-waterfall');
var window = require('global/window');
-var syncbase = require('./syncbase');
-var prr = require('prr');
module.exports = connect;
function connect(options) {
var client = new Client(options);
- client.discover(function onmount(err, stream) {
+ client.mount(function onmount(err) {
if (err) {
- return options.error.set(err);
+ client.emit('error', err);
+ return;
}
debug('discovery is setup');
@@ -51,7 +49,6 @@
client.name = '';
client.mounted = false;
client.runtime = {};
- client.peers = options.state.peers;
client.service = service(client);
// TODO(jasoncampbell): Come up with a better way to couple the bare service
@@ -67,14 +64,13 @@
var store = syncbase(options);
prr(client, 'syncbase', store);
client.emit('syncbase', store);
+ debug('runtime is available');
});
}
inherits(Client, EventEmitter);
-Client.prototype.discover = function(callback) {
- debug('initializing discovery');
-
+Client.prototype.mount = function(callback) {
var client = this;
var workers = [
client.init.bind(client),
@@ -93,7 +89,6 @@
};
Client.prototype.init = function(callback) {
- debug('calling init');
var client = this;
vanadium.init({
@@ -106,8 +101,6 @@
return callback(err);
}
- debug('runtime is ready');
-
// TODO(jasoncampbell): When this happens the window really, really needs to
// be reloaded. In order to safely reload the page state should be stored or
// serialized in a way that makes it recoverable for this error case.
@@ -123,8 +116,6 @@
// <prefix>/reader/syncgroup
var name = getName(runtime, client.id);
- debug('name: %s', name);
-
client.name = name;
client.emit('runtime', runtime);
@@ -137,6 +128,7 @@
var client = this;
var service = client.service;
+ debug('serve: %s', client.name);
runtime.newServer(client.name, service, onserve);
function onserve(err, server) {
@@ -144,8 +136,6 @@
return callback(err);
}
- debug('service is ready');
-
window.addEventListener = window.addEventListener || noop;
window.addEventListener('beforeunload', beforeunload);
@@ -165,14 +155,13 @@
}
};
-Client.prototype.glob = function(runtime, onmount) {
- onmount = once(onmount);
+// Globs until mounted.
+Client.prototype.glob = function(runtime, callback) {
+ callback = once(callback);
var client = this;
- var peers = client.peers;
// Glob pattern based on "<prefix>/reader/:id/app"
var pattern = prefix(runtime).replace('/chrome', '') + '/reader/*/app';
-
var stream = glob({
name: client.name,
runtime: runtime,
@@ -180,30 +169,37 @@
timeout: ms('12s')
});
- stream.on('error', function(err) {
- debug('glob-stream error: %s', err.stack);
- client.emit('error', err);
- });
+ // Glob stream ends once the mounted client.name has been discovered or there
+ // is an error.
+ //
+ // NOTE: Streaming names through service connection has been commented out,
+ // this will will be addressed later as the previous client/service model is
+ // unessecary given the way syncbase is being leveraged.
+ //
+ // TODO(jasoncampbell): Hook up a simple discovery mechanism to detect new
+ // peers and share information about constellation state like disconnects.
+ eos(stream, { readable: false }, callback);
- stream
- .pipe(filter(peers))
- .pipe(through(write))
- .on('error', function(err) {
- debug('peer-stream error: %s', err.stack);
- client.emit('error', err);
- });
- function write(buffer, enc, cb) {
- var name = buffer.toString();
-
- client.connect(name);
-
- if (name === client.name) {
- onmount();
- }
-
- cb(null, buffer);
- }
+ // stream
+ // .pipe(filter(peers))
+ // .pipe(through(write))
+ // .on('error', function(err) {
+ // debug('peer-stream error: %s', err.stack);
+ // client.emit('error', err);
+ // });
+ //
+ // function write(buffer, enc, cb) {
+ // var name = buffer.toString();
+ //
+ // client.connect(name);
+ //
+ // if (name === client.name) {
+ // onmount();
+ // }
+ //
+ // cb(null, buffer);
+ // }
};
Client.prototype.connect = function(name) {
@@ -310,48 +306,6 @@
return tasks;
};
-Client.prototype.sendPDF = function(key, file, callback) {
- var client = this;
- var runtime = client.runtime;
- var context = runtime.getContext();
- var tasks = client.remotes('connected', createWorker);
- var meta = {
- hash: key,
- name: file.name,
- size: file.size,
- type: file.type
- };
-
- // Execute tasks across peers in parallel.
- parallel(tasks, function done(err, results) {
- if (err) {
- callback(err);
- return;
- }
-
- callback(null, results);
- });
-
- function createWorker(peer) {
- debug('created worker for remote %s', peer.uuid);
- return worker;
-
- function worker(callback) {
- callback = once(callback);
- debug('sending PDF (%d bytes) to %s', file.size, peer.uuid);
-
- var promise = peer.remote.savePDF(context, meta, callback);
- var stream = promise.stream;
- var bs = new BlobReader(file);
-
- bs.on('error', callback);
- stream.on('end', callback);
-
- bs.pipe(stream);
- }
- }
-};
-
// TODO(jasoncampbell): Move naming related code into a separate module.
function getName(runtime, id) {
var p = prefix(runtime).replace('/chrome', '');
diff --git a/browser/vanadium/syncbase/db.js b/browser/vanadium/syncbase/db.js
new file mode 100644
index 0000000..3b95e17
--- /dev/null
+++ b/browser/vanadium/syncbase/db.js
@@ -0,0 +1,62 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+var ms = require('ms');
+var pass = require('./pass');
+var syncbase = require('syncbase');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+var waterfall = require('run-waterfall');
+
+module.exports = create;
+
+// Creates the syncbase structure up to the databse level. The database will be
+// mounted as: <name>/reader/db.
+function create(context, name, done) {
+ var tasks = [
+ pass(context, name),
+ createApp,
+ createDB
+ ];
+
+ waterfall(tasks, done);
+}
+
+function createApp(context, name, callback) {
+ var service = syncbase.newService(name);
+ var app = service.app('reader');
+ var ctx = context.withTimeout(ms('5s'));
+ var permissions = {};
+
+ app.create(ctx, permissions, onapp);
+
+ function onapp(err) {
+ if (err && !(err instanceof verror.ExistError)) {
+ var template = 'syncbase - app.create(...) failed\n * name: %s';
+ var message = format(template, name);
+ error(err, message, callback);
+ } else {
+ callback(null, context, app);
+ }
+ }
+}
+
+function createDB(context, app, callback) {
+ var db = app.noSqlDatabase('db');
+ var ctx = context.withTimeout(ms('5s'));
+ var permissions = {};
+
+ db.create(ctx, permissions, ondb);
+
+ function ondb(err) {
+ if (err && !(err instanceof verror.ExistError)) {
+ var message = format('syncbase - db.create(...) failed');
+ error(err, message, callback);
+ } else {
+ callback(null, db);
+ }
+ }
+}
diff --git a/browser/vanadium/syncbase/get.js b/browser/vanadium/syncbase/get.js
new file mode 100644
index 0000000..53c11c3
--- /dev/null
+++ b/browser/vanadium/syncbase/get.js
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var ms = require('ms');
+var parse = require('./parse');
+var pass = require('./pass');
+var waterfall = require('run-waterfall');
+
+module.exports = getData;
+
+function getData(context, table, key, callback) {
+ var tasks = [
+ pass(context, table, key),
+ get,
+ parse
+ ];
+
+ waterfall(tasks, callback);
+}
+
+function get(context, table, key, callback) {
+ var ctx = context.withTimeout(ms('5s'));
+
+ table.get(ctx, key, onGet);
+
+ function onGet(err, string) {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ callback(null, string);
+ }
+}
diff --git a/browser/vanadium/syncbase/index.js b/browser/vanadium/syncbase/index.js
index bc31e0a..d717111 100644
--- a/browser/vanadium/syncbase/index.js
+++ b/browser/vanadium/syncbase/index.js
@@ -4,17 +4,23 @@
var assert = require('assert');
var BlobReader = require('readable-blob-stream');
+var db = require('./db');
var debug = require('debug')('reader:syncbase');
+var dz = require('dezalgo');
var eos = require('end-of-stream');
+var error = require('../error');
var EventEmitter = require('events').EventEmitter;
var extend = require('xtend');
+var format = require('util').format;
+var get = require('./get');
var inherits = require('inherits');
-var json = require('./json');
var ms = require('ms');
var once = require('once');
+var parse = require('./parse');
var prr = require('prr');
-var setup = require('./setup-db');
+var put = require('./put');
var syncbase = require('syncbase');
+var table = require('./table');
var through = require('through2');
var util = require('../util');
var vanadium = require('vanadium');
@@ -23,8 +29,14 @@
module.exports = Store;
-// Naming for the DB setup looks like <app-name>/<db-name>/<table-name> and
-// <db-name> is hard coded to "db".
+// Naming for the Syncbase setup looks like <app-name>/<db>/<table>, these
+// values are hard-coded in ./db.js and result in serices being mounted with the
+// following structure:
+//
+// <name>/reader/db/<table>
+//
+// The app, database, table setup is lazily evaluated and will create missing
+// components in the corresponding syncbase if they are missing.
function Store(options) {
if (!(this instanceof Store)) {
return new Store(options);
@@ -39,76 +51,97 @@
prr(store, 'runtime', options.runtime);
prr(store, 'name', options.name, 'e');
- prr(store, 'tables', {}, 'e');
+ prr(store, '_tables', {});
+ prr(store, '_db', {}, 'c');
+
+ store._status = 'new';
+ store.on('status', onstatus);
+
+ function onstatus(status) {
+ debug('status: %s', status);
+ store._status = status;
+ }
}
inherits(Store, EventEmitter);
-Store.prototype.db = function(callback) {
+Store.prototype.status = function(status) {
var store = this;
- var runtime = store.runtime;
- var name = store.name;
-
- if (store._db) {
- return process.nextTick(function next() {
- callback(null, store._db);
- });
- }
-
- setup(runtime, name, function onsetup(err, db) {
- if (err) {
- callback(err);
- return;
- }
-
- // NOTE: It's possible for this async call to happen in paralell and a
- // previos callback to have fired already. If that is the case do not re-set
- // `store._db`.
- // TODO: Wrap calls with some logic where pending callbacks can
- // be triggered without making multiple `setup` calls.
- if (! store._db) {
- prr(store, '_db', db);
- }
-
- callback(null, db);
- });
+ return store._status === status;
};
-Store.prototype.table = function(name, callback) {
+Store.prototype.db = function(callback) {
+ callback = dz(callback);
+
var store = this;
- if (!!store.tables[name]) {
- return process.nextTick(function next() {
- callback(null, store.tables[name]);
- });
+ if (store.status('ready')) {
+ callback(null, store._db);
+ return;
}
- store.db(function ondb(err, db) {
+ // In case a previous call is in process it's callback will handle the error
+ // case, but current calls will need to be queued. If the in-process call is
+ // successful queued callbacks can be called via the "db" event.
+ if (store.status('initializing')) {
+ store.once('db', callback.bind(null, null));
+ return;
+ }
+
+ store.emit('status', 'initializing');
+
+ var context = store.runtime.getContext();
+ var name = store.name;
+ db(context, name, ondb);
+
+ function ondb(err, db) {
if (err) {
callback(err);
return;
}
- var table = db.table(name);
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
- var permissions = {};
+ prr(store, '_db', db);
+ store.emit('db', db);
+ store.emit('status', 'ready');
+ callback(null, db);
+ }
+};
- table.create(context, permissions, function ontable(err) {
- if (err && !(err instanceof verror.ExistError)) {
- callback(err);
- return;
- }
+Store.prototype.table = function(keyspace, callback) {
+ callback = dz(callback);
- store.tables[name] = table;
+ var store = this;
+ var cached = store._tables[keyspace];
- callback(null, table);
- });
- });
+ if (cached) {
+ callback(null, cached);
+ return;
+ }
+
+ store.db(ondb);
+
+ function ondb(err, db) {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ var context = store.runtime.getContext();
+ table(context, db, keyspace, ontable);
+ }
+
+ function ontable(err, table) {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ store._tables[keyspace] = table;
+ callback(null, table);
+ }
};
Store.prototype.get = function(keyspace, key, callback) {
- debug('#get("%s", "%s", callback)', keyspace, key);
var store = this;
store.table(keyspace, ontable);
@@ -119,27 +152,27 @@
return;
}
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
-
- table.get(context, key, onget);
+ var context = store.runtime.getContext();
+ get(context, table, key, onget);
}
- function onget(err, string) {
+ function onget(err, data) {
if (err) {
- callback(err);
+ var template = 'syncbase #get("%s", "%s", callback) failed';
+ var message = format(template, keyspace, key);
+ error(err, message, callback);
return;
}
- json.decode(string, callback);
+ callback(null, data);
}
};
-Store.prototype.put = function(keyspace, value, callback) {
- debug('#put("%s", %o, callback)', keyspace, value);
- var store = this;
+Store.prototype.put = function(keyspace, data, callback) {
+ assert.ok(data.id, 'data.id is required.');
- assert.ok(value.id, 'item.id is required.');
+ var store = this;
+ var context = store.runtime.getContext();
store.table(keyspace, ontable);
@@ -148,58 +181,39 @@
return callback(err);
}
- if (keyspace === 'files' && !value.ref) {
- assert.ok(value.blob, 'item.blob is required.');
- store.putBlob(value.blob, onref);
+ if (keyspace === 'files' && !data.ref) {
+ store.putBlob(data.blob, onref);
} else {
- put(table, value, callback);
+ put(context, table, data, callback);
}
+ return;
+
function onref(err, ref) {
if (err) {
return callback(err);
}
- value.ref = ref;
- put(table, value, callback);
- }
-
- function put(table, value) {
- var key = value.id;
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
-
- json.encode(value, function onjson(err, string){
- table.put(context, key, string, onput);
- });
-
- function onput(err) {
- if (err) {
- return callback(err);
- }
-
- callback(null, value);
- }
+ data.ref = ref;
+ put(context, table, data, callback);
}
}
};
Store.prototype.del = function(keyspace, key, callback) {
- debug('#del("%s", "%s", callback)', keyspace, key);
var store = this;
store.table(keyspace, ontable);
function ontable(err, table) {
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
- table.delete(context, key, callback);
+ table.delete(ctx, key, callback);
}
};
Store.prototype.putBlob = function(blob, callback) {
- debug('#putBlob(%o, callback)', blob);
assert.ok(blob instanceof window.Blob, 'Must use a Blob object.');
var store = this;
@@ -211,10 +225,9 @@
return callback(err);
}
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
-
- db.createBlob(context, onblob);
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
+ db.createBlob(ctx, onblob);
}
function onblob(err, vblob) {
@@ -224,10 +237,10 @@
}
var reader = new BlobReader(blob);
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
var done = once(onput);
- var writer = vblob.put(context, done);
+ var writer = vblob.put(ctx, done);
eos(reader, done);
eos(writer, { readable: false }, done);
@@ -240,10 +253,9 @@
return;
}
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
-
- vblob.commit(context, function oncommit(err) {
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
+ vblob.commit(ctx, function oncommit(err) {
if (err) {
callback(err);
return;
@@ -274,11 +286,11 @@
return;
}
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
var range = syncbase.nosql.rowrange.range(options.start, options.limit);
var done = once(end);
- var reader = table.scan(context, range, done);
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
+ var reader = table.scan(ctx, range, done);
eos(reader, done);
reader.pipe(stream);
@@ -290,15 +302,17 @@
key: chunk.key
};
- json.decode(chunk.value, function onvalue(err, value) {
+ parse(chunk.value, onparse);
+
+ function onparse(err, data) {
if (err) {
callback(err);
return;
}
- item.value = value;
+ item.value = data;
callback(null, item);
- });
+ }
}
function end(err) {
@@ -323,10 +337,9 @@
return;
}
- var runtime = store.runtime;
- var context = util.timeout(runtime, ms('5s'));
-
- db.getResumeMarker(context, onmarker);
+ var context = store.runtime.getContext();
+ var ctx = context.withTimeout(ms('5s'));
+ db.getResumeMarker(ctx, onmarker);
function onmarker(err, marker) {
if (err) {
@@ -334,11 +347,11 @@
return;
}
- var runtime = store.runtime;
- var context = util.cancel(runtime);
var prefix = '';
var done = once(end);
- var reader = db.watch(context, keyspace, prefix, marker, done);
+ var context = store.runtime.getContext();
+ var ctx = context.withCancel();
+ var reader = db.watch(ctx, keyspace, prefix, marker, done);
eos(reader, reader);
reader.pipe(stream);
@@ -352,6 +365,8 @@
return;
}
+ debug('change: %o', chunk);
+
var change = {
type: chunk.changeType,
key: chunk.rowName,
@@ -373,8 +388,9 @@
return;
}
+ debug('get success: %o', value);
change.value = value;
- callback(null, value);
+ callback(null, change);
}
}
@@ -385,3 +401,103 @@
}
}
};
+
+Store.prototype.sync = function(callback) {
+ var _db;
+
+ var store = this;
+ var username = util.parseName(store.name).username;
+ var context = store.runtime.getContext();
+ // The Vanadium name of the running syncbase instance that hosts the sync
+ // group.
+ var name = 'users/' + username + '/reader/cloudsync';
+ var syncname = [
+ name,
+ '%%sync', // Syncbase naming scheme.
+ 'cloudsync' // Suffix.
+ ].join('/');
+ // TODO(jasoncampbell): Find docs or something about what this is and what it
+ // does.
+ var info = new syncbase.nosql.SyncGroupMemberInfo({
+ syncPriority: 8
+ });
+
+ db(context, name, ondb);
+
+ function ondb(err, db) {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ _db = db;
+ table(context, db, 'files', ontable);
+ }
+
+ function ontable(err, table) {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ var permissions = new window.Map([
+ [ 'Admin', { 'in': [ '...' ] } ],
+ [ 'Read', { 'in': [ '...' ] } ],
+ [ 'Write', { 'in': [ '...' ] } ],
+ [ 'Resolve', { 'in': [ '...' ] } ],
+ [ 'Debug', { 'in': [ '...' ] } ]
+ ]);
+
+ var spec = new syncbase.nosql.SyncGroupSpec({
+ description: 'reader syncgroup ',
+ perms: permissions,
+ // Prefixes are structured by <table/keyspace>:<key-pattern> where
+ // <key-pattern> matches keys (rows in syncbase vernacular). The rows/keys
+ // are mounted as <name>/reader/db/<table>/<key/row> so the prefixes can
+ // be thought of as a vanadium namespace glob on the last two names with a
+ // different separator.
+ prefixes: [ 'files:c' ],
+ // mountTables: [ ... ] - actually a rendezvous point that is
+ // permissable to mount to by the syncbase instance hosting the sync
+ // group.
+ //
+ // Note this name NEEDS to be rooted.
+ mountTables: [
+ '/ns.dev.v.io:8101/users/' + username + '/reader/rendezvous'
+ ]
+ });
+
+ var ctx = context.withTimeout(ms('5s'));
+ var group = _db.syncGroup(syncname);
+ group.create(ctx, spec, info, oncreate);
+ }
+
+ function oncreate(err) {
+ if (err && !(err instanceof verror.ExistError)) {
+ return callback(err);
+ }
+
+ debug('remote syncbase configured!');
+
+ // Now setup local syncbase to join the remote syncgroup.
+ store.db(function(err, db) {
+ if (err) {
+ return callback(err);
+ }
+
+ var ctx = context.withTimeout(ms('5s'));
+ var group = db.syncGroup(syncname);
+
+ debug('joining syncgroup: %s', syncname);
+ group.join(ctx, info, onjoin);
+ });
+
+ function onjoin(err) {
+ if (err && !(err instanceof verror.ExistError)) {
+ return callback(err);
+ }
+
+ callback(null);
+ }
+ }
+};
diff --git a/browser/vanadium/syncbase/json.js b/browser/vanadium/syncbase/json.js
deleted file mode 100644
index aca175d..0000000
--- a/browser/vanadium/syncbase/json.js
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var VError = require('verror');
-
-module.exports = {
- encode: encode,
- decode: decode
-};
-
-function encode(data, callback) {
- var string = '';
-
- try {
- string = JSON.stringify(data);
- } catch (e) {
- var err = new VError(e, 'Failed to encode %o', data);
- return callback(err);
- }
-
- callback(null, string);
-}
-
-function decode(string, callback) {
- var value;
-
- try {
- value = JSON.parse(string);
- } catch (e) {
- var err = new VError(e, 'Failed to decode "%s"', string);
- return callback(err);
- }
-
- callback(null, value);
-}
diff --git a/browser/vanadium/syncbase/parse.js b/browser/vanadium/syncbase/parse.js
new file mode 100644
index 0000000..b33fc02
--- /dev/null
+++ b/browser/vanadium/syncbase/parse.js
@@ -0,0 +1,22 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+
+module.exports = parse;
+
+function parse(string, callback) {
+ var value;
+
+ try {
+ value = JSON.parse(string);
+ } catch (e) {
+ var message = format('Failed to decode "%s"', string);
+ error(e, message, callback);
+ return;
+ }
+
+ callback(null, value);
+}
diff --git a/browser/vanadium/syncbase/pass.js b/browser/vanadium/syncbase/pass.js
new file mode 100644
index 0000000..ce08f42
--- /dev/null
+++ b/browser/vanadium/syncbase/pass.js
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = pass;
+
+// Creates a worker that simply passes arguments to the next worker in a
+// waterfall series. This helps avoid anti-patterns like creating multiple
+// functions within the calling closure/scope or excessive use of
+// `fn.bind(...)`.
+//
+// The worker's callback is fired with the error argument set to null.
+//
+// This helper function is meant to be used with the run-waterfall module as the
+// first task.
+//
+// SEE: https://goo.gl/pOSb30
+function pass() {
+ var args = Array.prototype.slice.call(arguments);
+ args.unshift(null);
+ return worker;
+
+ function worker(callback) {
+ callback.apply(null, args);
+ }
+}
diff --git a/browser/vanadium/syncbase/put.js b/browser/vanadium/syncbase/put.js
new file mode 100644
index 0000000..156ef37
--- /dev/null
+++ b/browser/vanadium/syncbase/put.js
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var debug = require('debug')('reader:syncbase:put');
+var equal = require('deep-equal');
+var error = require('../error');
+var format = require('util').format;
+var get = require('./get');
+var ms = require('ms');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+
+var options = { strict: true };
+
+module.exports = put;
+
+function put(context, table, data, callback) {
+ get(context, table, data.id, onGet);
+
+ function onGet(err, old) {
+ if (err && !(err instanceof verror.NoExistError)) {
+ callback(err);
+ return;
+ }
+
+ // Skip put calls to syncbase if the content is the same as what exisits.
+ // This prevents recursion on put, watch, state update cycles over p2p sync.
+ if (equal(old, data, options)) {
+ debug('skipping put, old value is the same');
+ callback(null, data);
+ return;
+ }
+
+ var string = '';
+
+ try {
+ string = JSON.stringify(data);
+ } catch (e) {
+ var message = format('Failed to encode "%s"', data);
+ error(e, message, callback);
+ return;
+ }
+
+ var ctx = context.withTimeout(ms('5s'));
+ table.put(ctx, data.id, string, onPut);
+ }
+
+ function onPut(err) {
+ if (err) {
+ var template = 'table: "%s" .put(ctx, "%s", ...) failed';
+ var message = format(template, table.name, data.id);
+ error(err, message, callback);
+ return;
+ }
+
+ callback(null, data);
+ }
+}
diff --git a/browser/vanadium/syncbase/setup-db.js b/browser/vanadium/syncbase/setup-db.js
deleted file mode 100644
index 76d3b9b..0000000
--- a/browser/vanadium/syncbase/setup-db.js
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-var waterfall = require('run-waterfall');
-var vanadium = require('vanadium');
-var verror = vanadium.verror;
-var util = require('../util');
-var syncbase = require('syncbase');
-var util = require('../util');
-var ms = require('ms');
-
-module.exports = setup;
-
-function setup(runtime, name, done) {
- var tasks = [
- createApp,
- createDB
- ];
-
- waterfall(tasks, done);
-
- function createApp(callback) {
- var service = syncbase.newService(name);
- var appName = util.parseName(name).app;
- var app = service.app(appName);
- var context = util.timeout(runtime, ms('5s'));
- var permissions = {};
-
- app.create(context, permissions, function onapp(err) {
- if (err && !(err instanceof verror.ExistError)) {
- callback(err);
- } else {
- callback(null, app);
- }
- });
- }
-
- function createDB(app, callback) {
- var db = app.noSqlDatabase('db');
- var context = util.timeout(runtime, ms('5s'));
- var permissions = {};
-
- db.create(context, permissions, function ondb(err) {
- if (err && !(err instanceof verror.ExistError)) {
- callback(err);
- } else {
- callback(null, db);
- }
- });
- }
-}
diff --git a/browser/vanadium/syncbase/table.js b/browser/vanadium/syncbase/table.js
new file mode 100644
index 0000000..3588958
--- /dev/null
+++ b/browser/vanadium/syncbase/table.js
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var error = require('../error');
+var format = require('util').format;
+var ms = require('ms');
+var vanadium = require('vanadium');
+var verror = vanadium.verror;
+
+module.exports = createOrGetTable;
+
+function createOrGetTable(context, db, keyspace, callback) {
+ var table = db.table(keyspace);
+ var ctx = context.withTimeout(ms('5s'));
+ var permissions = {};
+
+ table.create(ctx, permissions, onTableCreate);
+
+ function onTableCreate(err) {
+ if (err && !(err instanceof verror.ExistError)) {
+ var template = 'syncbase - db.table("%s").create(...) failed';
+ var message = format(template, keyspace);
+ error(err, message, callback);
+ return;
+ }
+
+ callback(null, table);
+ }
+}
diff --git a/browser/vanadium/util.js b/browser/vanadium/util.js
index d6a981c..f4b54e2 100644
--- a/browser/vanadium/util.js
+++ b/browser/vanadium/util.js
@@ -5,22 +5,9 @@
var pathToRegexp = require('path-to-regexp');
module.exports = {
- timeout: timeout,
- cancel: cancel,
parseName: parseName
};
-function timeout(runtime, miliseconds) {
- var context = runtime.getContext();
- return context.withTimeout(miliseconds || 5000);
-}
-
-function cancel(runtime) {
- var context = runtime.getContext();
- return context.withCancel();
-}
-
-
function parseName(name) {
var keys = [];
var re = pathToRegexp('users/:username/:app/:id/:suffix', keys);
diff --git a/package.json b/package.json
index e72249d..30f2f27 100644
--- a/package.json
+++ b/package.json
@@ -33,6 +33,8 @@
"browserify": "^11.0.1",
"cuid": "^1.3.8",
"debug": "^2.2.0",
+ "deep-equal": "^1.0.1",
+ "dezalgo": "^1.0.3",
"domready": "^1.0.8",
"end-of-stream": "^1.1.0",
"extend": "^2.0.1",
@@ -47,6 +49,7 @@
"once": "^1.3.2",
"path-to-regexp": "^1.2.1",
"prr": "^1.0.1",
+ "qs": "^5.1.0",
"readable-blob-stream": "^1.0.0",
"routes": "^2.1.0",
"run-parallel": "^1.1.2",
@@ -58,7 +61,6 @@
"thunky": "^0.1.0",
"uuid": "^2.0.1",
"vanadium": "*",
- "verror": "^1.6.0",
"xtend": "^4.0.0"
}
}