blob: 7f0f2a7f4c988b7eed2c70193853470e176ee54b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
var assert = require('assert');
var BlobReader = require('readable-blob-stream');
var db = require('./db');
var debug = require('debug')('reader:syncbase');
var dz = require('dezalgo');
var eos = require('end-of-stream');
var error = require('../error');
var EventEmitter = require('events').EventEmitter;
var extend = require('xtend');
var format = require('util').format;
var get = require('./get');
var inherits = require('inherits');
var ms = require('ms');
var once = require('once');
var parse = require('./parse');
var prr = require('prr');
var put = require('./put');
var syncbase = require('syncbase');
var table = require('./table');
var through = require('through2');
var util = require('../util');
var vanadium = require('vanadium');
var verror = vanadium.verror;
var window = require('global/window');
module.exports = Store;
// Naming for the Syncbase setup looks like <app-name>/<db>/<table>, these
// values are hard-coded in ./db.js and result in serices being mounted with the
// following structure:
//
// <name>/reader/db/<table>
//
// The app, database, table setup is lazily evaluated and will create missing
// components in the corresponding syncbase if they are missing.
function Store(options) {
if (!(this instanceof Store)) {
return new Store(options);
}
debug('initializing syncbase: %o', options);
assert.ok(options.runtime, 'options.runtime required');
assert.ok(options.name, 'options.name required');
EventEmitter.call(this);
var store = this;
prr(store, 'runtime', options.runtime);
prr(store, 'name', options.name, 'e');
prr(store, '_tables', {});
prr(store, '_db', {}, 'c');
store._status = 'new';
store.on('status', onstatus);
function onstatus(status) {
debug('status: %s', status);
store._status = status;
}
}
inherits(Store, EventEmitter);
Store.prototype.status = function(status) {
var store = this;
return store._status === status;
};
Store.prototype.db = function(callback) {
callback = dz(callback);
var store = this;
if (store.status('ready')) {
callback(null, store._db);
return;
}
// In case a previous call is in process it's callback will handle the error
// case, but current calls will need to be queued. If the in-process call is
// successful queued callbacks can be called via the "db" event.
if (store.status('initializing')) {
store.once('db', callback.bind(null, null));
return;
}
store.emit('status', 'initializing');
var context = store.runtime.getContext();
var name = store.name;
db(context, name, ondb);
function ondb(err, db) {
if (err) {
callback(err);
return;
}
prr(store, '_db', db);
store.emit('db', db);
store.emit('status', 'ready');
callback(null, db);
}
};
Store.prototype.table = function(keyspace, callback) {
callback = dz(callback);
var store = this;
var cached = store._tables[keyspace];
if (cached) {
callback(null, cached);
return;
}
store.db(ondb);
function ondb(err, db) {
if (err) {
callback(err);
return;
}
var context = store.runtime.getContext();
table(context, db, keyspace, ontable);
}
function ontable(err, table) {
if (err) {
callback(err);
return;
}
store._tables[keyspace] = table;
callback(null, table);
}
};
Store.prototype.get = function(keyspace, key, callback) {
var store = this;
store.table(keyspace, ontable);
function ontable(err, table) {
if (err) {
callback(err);
return;
}
var context = store.runtime.getContext();
get(context, table, key, onget);
}
function onget(err, data) {
if (err) {
var template = 'syncbase #get("%s", "%s", callback) failed';
var message = format(template, keyspace, key);
error(err, message, callback);
return;
}
callback(null, data);
}
};
Store.prototype.put = function(keyspace, data, callback) {
assert.ok(data.id, 'data.id is required.');
var store = this;
var context = store.runtime.getContext();
store.table(keyspace, ontable);
function ontable(err, table) {
if (err) {
return callback(err);
}
if (keyspace === 'files' && !data.ref) {
store.putBlob(data.blob, onref);
} else {
put(context, table, data, callback);
}
return;
function onref(err, ref) {
if (err) {
return callback(err);
}
data.ref = ref;
put(context, table, data, callback);
}
}
};
Store.prototype.del = function(keyspace, key, callback) {
var store = this;
store.table(keyspace, ontable);
function ontable(err, table) {
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
table.delete(ctx, key, callback);
}
};
Store.prototype.putBlob = function(blob, callback) {
assert.ok(blob instanceof window.Blob, 'Must use a Blob object.');
var store = this;
store.db(ondb);
function ondb(err, db) {
if (err) {
return callback(err);
}
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
db.createBlob(ctx, onblob);
}
function onblob(err, vblob) {
if (err) {
callback(err);
return;
}
var reader = new BlobReader(blob);
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
var done = once(onput);
var writer = vblob.put(ctx, done);
eos(reader, done);
eos(writer, { readable: false }, done);
reader.pipe(writer);
function onput(err) {
if (err) {
callback(err);
return;
}
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
vblob.commit(ctx, function oncommit(err) {
if (err) {
callback(err);
return;
}
callback(null, vblob.ref);
});
}
}
};
Store.prototype.createReadStream = function(keyspace, options) {
options = extend({
start: '',
limit: ''
}, options);
var store = this;
var stream = through.obj(write);
store.table(keyspace, ontable);
return stream;
function ontable(err, table) {
if (err) {
stream.emit('error', err);
return;
}
var range = syncbase.nosql.rowrange.range(options.start, options.limit);
var done = once(end);
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
var reader = table.scan(ctx, range, done);
eos(reader, done);
reader.pipe(stream);
}
// Data chunks are `syncbase/nosql.KeyValue` objects.
function write(chunk, enc, callback) {
var item = {
key: chunk.key
};
parse(chunk.value, onparse);
function onparse(err, data) {
if (err) {
callback(err);
return;
}
item.value = data;
callback(null, item);
}
}
function end(err) {
if (err) {
stream.emit('error', err);
return;
}
}
};
Store.prototype.createWatchStream = function(keyspace) {
var store = this;
var stream = through.obj(write);
store.db(ondb);
return stream;
function ondb(err, db) {
if (err) {
stream.emit('error', err);
return;
}
var context = store.runtime.getContext();
var ctx = context.withTimeout(ms('5s'));
db.getResumeMarker(ctx, onmarker);
function onmarker(err, marker) {
if (err) {
stream.emit('error', err);
return;
}
var prefix = '';
var done = once(end);
var context = store.runtime.getContext();
var ctx = context.withCancel();
var reader = db.watch(ctx, keyspace, prefix, marker, done);
eos(reader, reader);
reader.pipe(stream);
}
}
function write(chunk, enc, callback) {
// Ignore local changes and only write changes from sync updates.
if (!chunk.fromSync) {
callback(null);
return;
}
debug('change: %o', chunk);
var change = {
type: chunk.changeType,
key: chunk.rowName,
keyspace: chunk.tableName,
};
switch (change.type) {
case 'put':
store.get(change.keyspace, change.key, onget);
break;
case 'delete':
callback(null, change);
break;
}
function onget(err, value) {
if (err) {
callback(err);
return;
}
debug('get success: %o', value);
change.value = value;
callback(null, change);
}
}
function end(err) {
if (err) {
stream.emit('error', err);
return;
}
}
};
Store.prototype.sync = function(callback) {
var _db;
var store = this;
var username = util.parseName(store.name).username;
var context = store.runtime.getContext();
// The Vanadium name of the running syncbase instance that hosts the sync
// group.
var name = 'users/' + username + '/reader/cloudsync';
var syncname = [
name,
'%%sync', // Syncbase naming scheme.
'cloudsync' // Suffix.
].join('/');
// TODO(jasoncampbell): Find docs or something about what this is and what it
// does.
var info = new syncbase.nosql.SyncgroupMemberInfo({
syncPriority: 8
});
db(context, name, ondb);
function ondb(err, db) {
if (err) {
callback(err);
return;
}
_db = db;
table(context, db, 'files', ontable);
}
function ontable(err, table) {
if (err) {
callback(err);
return;
}
var permissions = new window.Map([
[ 'Admin', { 'in': [ '...' ] } ],
[ 'Read', { 'in': [ '...' ] } ],
[ 'Write', { 'in': [ '...' ] } ],
[ 'Resolve', { 'in': [ '...' ] } ],
[ 'Debug', { 'in': [ '...' ] } ]
]);
var spec = new syncbase.nosql.SyncgroupSpec({
description: 'reader syncgroup ',
perms: permissions,
// Prefixes are structured as {<tableName>, <keyPrefix>} where <keyPrefix>
// matches row keys. Rows have Vanadium object names of the form
// <syncbaseName>/reader/db/<table>/<rowKey>, so a syncgroup prefix can be
// thought of as a vanadium namespace glob over rows.
prefixes: [new syncbase.nosql.TableRow({
tableName: 'files',
row: 'c'
})],
// mountTables: [ ... ] - actually a rendezvous point that is
// permissable to mount to by the syncbase instance hosting the sync
// group.
//
// Note this name NEEDS to be rooted.
mountTables: [
'/ns.dev.v.io:8101/users/' + username + '/reader/rendezvous'
]
});
var ctx = context.withTimeout(ms('5s'));
var group = _db.syncgroup(syncname);
group.create(ctx, spec, info, oncreate);
}
function oncreate(err) {
if (err && !(err instanceof verror.ExistError)) {
return callback(err);
}
debug('remote syncbase configured!');
// Now setup local syncbase to join the remote syncgroup.
store.db(function(err, db) {
if (err) {
return callback(err);
}
var ctx = context.withTimeout(ms('5s'));
var group = db.syncgroup(syncname);
debug('joining syncgroup: %s', syncname);
group.join(ctx, info, onjoin);
});
function onjoin(err) {
if (err && !(err instanceof verror.ExistError)) {
return callback(err);
}
callback(null);
}
}
};