blob: 112a0f682842126aeb89a76dc2da90b5fbff08c9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
require('es6-shim');
var promisify = require('es6-promisify');
var syncbase = require('syncbase');
var vanadium = require('vanadium');
var verror = vanadium.verror;
var $ = require('../util/jquery');
var defineClass = require('../util/define-class');
var debug = require('../debug');
/**
* Create app, db, and table structure in Syncbase.
*/
function setUp(context, app, db) {
function nonfatals(err) {
if (err instanceof verror.ExistError) {
console.info(err.msg);
} else {
throw err;
}
}
//TODO(rosswang) If {} will remain empty, can it be omitted?
return promisify(app.create.bind(app))(context, {})
.catch(nonfatals)
.then(function() {
return promisify(db.create.bind(db))(context, {});
})
.catch(nonfatals)
.then(function() {
var table = db.table('t');
return promisify(table.create.bind(table))(context, {});
})
.catch(nonfatals);
}
function joinKey(key) {
return key.join('.');
}
function splitKey(key) {
return key.split('.');
}
/**
* Translate Syncbase hierarchical keys to object structure for easier
* processing. '.' is chosen as the separator; '/' is reserved in Syncbase.
*
* It might be ideal to have the separator configurable, but certain separators
* need regex escaping.
*/
function recursiveSet(root, key, value) {
var member = key[0];
if (key.length > 1) {
var child = root[member];
if (!child) {
child = root[member] = {};
} else if (typeof child !== 'object') {
child = root[member] = { _: child };
}
recursiveSet(child, key.slice(1), value);
} else {
var obj = root[member];
if (obj) {
obj._ = value;
} else {
root[member] = value;
}
}
}
function recursiveDelete(root, key) {
var member = key[0];
if (key.length > 1) {
var child = root[member];
if (typeof child === 'object') {
recursiveDelete(child, key.slice(1));
if ($.isEmptyObject(child)) {
delete root[member];
}
}
} else {
delete root[member];
}
}
var SG_MEMBER_INFO = new syncbase.nosql.SyncgroupMemberInfo();
// TODO(rosswang): generalize this
// If this is updated, the regex in escapeKeyElement needs updating too.
var ESC = {
'_': '_',
'.': 'd',
'@': 'a',
'/': 's',
':': 'c'
};
var INV = {};
$.each(ESC, function(k, v) {
INV[v] = k;
});
var SyncbaseWrapper = defineClass({
statics: {
start: function(context, mountName) {
var service = syncbase.newService(mountName);
var app = service.app('travel');
var db = app.noSqlDatabase('db');
return setUp(context, app, db).then(function() {
return new SyncbaseWrapper(context, db, mountName);
});
},
escapeKeyElement: function(str) {
return str.replace(/_|\.|@|\/|:/g, function(m) {
return '_' + ESC[m];
});
},
unescapeKeyElement: function(str) {
return str.replace(/_(.)/g, function(m, p1) {
return INV[p1];
});
}
},
publics: {
/**
* @param fn a function executing the batch operations, receiving as its
* `this` context and first parameter the batch operation methods
* (put, delete), each of which returns a promise. The callback must return
* the overarching promise.
*/
batch: function(fn){
var self = this;
var opts = new syncbase.nosql.BatchOptions();
return this.manageWrite(this.runInBatch(this.context, this.db, opts,
function(db, cb) {
var t = db.table('t');
var putToSyncbase = promisify(t.put.bind(t));
var deleteFromSyncbase = promisify(t.deleteRange.bind(t));
var ops = {
put: function(k, v) {
return self.standardPut(putToSyncbase, k, v);
},
delete: function(k) {
return self.standardDelete(deleteFromSyncbase, k);
}
};
var p = fn.call(ops, ops);
if (p) {
p.then(function(result) {
return cb(null, result);
}, function(err) {
return cb(err);
});
} else {
cb();
}
}));
},
/**
* @param k array of key elements
* @param v serialized value
*/
put: function(k, v) {
return this.manageWrite(this.standardPut(this.putToSyncbase, k, v));
},
delete: function(k) {
return this.manageWrite(this.standardDelete(this.deleteFromSyncbase, k));
},
// TODO(rosswang): transitional
getData: function() {
return this.data;
},
/**
* Since I/O is asynchronous, sparse, and fast, let's avoid concurrency/
* merging with the local syncbase instance by only starting a refresh if
* no writes are in progress and the refresh finishes before any new writes
* have started. Client watch should help make this better. In any case if
* this becomes starved, we can be smarter by being sensitive to keys being
* updated at any given time.
*
* We can also get around this problem by restructuring the data flow to
* be unidirectional with the local Syncbase as the authority, though that
* introduces (hopefully negligible) latency and complicates forked response
* on user input for the same data.
*
* @returns a void promise for this refresh
*/
refresh: function() {
var self = this;
var current = this.pull.current;
if (!current) {
current = this.pull.current = this.pull().then(function(data) {
self.pull.current = null;
self.data = data;
self.onUpdate(data);
return data;
}, function(err) {
self.pull.current = null;
throw err;
});
}
return current;
},
/**
* @see refresh
*/
pull: function(prefix) {
var self = this;
function repull() {
return self.pull(prefix);
}
if (this.writes.size) {
return Promise.all(this.writes)
.then(repull, repull);
} else {
this.dirty = false;
return new Promise(function(resolve, reject) {
var newData = {};
var abort = false;
var isHeader = true;
var query = 'select k, v from t';
if (prefix) {
query += ' where k like "' + joinKey(prefix) + '%"';
}
self.db.exec(self.context, query, function(err) {
if (err) {
reject(err);
} else if (abort) {
//no-op; promise has already been resolved.
} else if (self.dirty) {
debug.log('Syncbase: aborting refresh due to writes');
resolve(repull()); //try/wait for idle again
} else {
resolve(newData);
}
}).on('data', function(row) {
if (isHeader) {
isHeader = false;
return;
}
if (abort) {
//no-op
} else if (self.dirty) {
abort = true;
resolve(repull()); //try/wait for idle again
/* It would be nice to abort this stream for real, but we can't.
* Leave this handler attached but no-oping to drain the stream.
*/
} else {
recursiveSet(newData, splitKey(row[0]), row[1]);
}
}).on('error', reject);
}).catch(function(err) {
if (err instanceof verror.InternalError) {
console.error(err);
} else {
throw err;
}
});
}
},
// TODO(rosswang): end transitional
syncgroup: function(sgAdmin, name) {
var self = this;
name = vanadium.naming.join(sgAdmin, '%%sync', name);
var sg = this.db.syncgroup(name);
//syncgroup-promisified
var sgp;
function chainable(cb) {
return function(err) {
cb(err, sgp);
};
}
var create = promisify(function(spec, cb) {
debug.log('Syncbase: create syncgroup ' + name);
sg.create(self.context, spec, SG_MEMBER_INFO, chainable(cb));
});
var destroy = promisify(function(cb) {
debug.log('Syncbase: destroy syncgroup ' + name);
sg.destroy(self.context, cb);
});
var join = promisify(function(cb) {
sg.join(self.context, SG_MEMBER_INFO, chainable(cb));
});
var getSpec = promisify(function(cb) {
sg.getSpec(self.context, function(err, spec, version) {
cb(err, {
spec: spec,
version: version
});
});
});
var setSpec = promisify(function(spec, version, cb) {
sg.setSpec(self.context, spec, version, chainable(cb));
});
/* Be explicit about arg lists because promisify is sensitive to extra
* args. i.e. even though destroy and join could just be fn refs, since
* they're made by promisify, wrap them in a fn that actually takes 0
* args. */
sgp = {
buildSpec: function(prefixes, mountTables, admin, initialPermissions) {
return new syncbase.nosql.SyncgroupSpec({
perms: new Map([
['Admin', {in: [admin]}],
['Read', {in: initialPermissions}],
['Write', {in: initialPermissions}],
['Resolve', {in: initialPermissions}],
['Debug', {in: [admin]}]
]),
prefixes: prefixes.map(function(p) {
return new syncbase.nosql.TableRow({
tableName: 't',
row: joinKey(p)
});
}),
mountTables: mountTables
});
},
create: function(spec) { return create(spec); },
destroy: function() { return destroy(); },
join: function() { return join(); },
getSpec: function() { return getSpec(); },
setSpec: function(spec, version) { return setSpec(spec, version); },
changeSpec: function(fn) {
return sgp.getSpec().then(function(versionedSpec) {
var spec = versionedSpec.spec;
return sgp.setSpec(fn(spec) || spec, versionedSpec.version)
.catch(function(err) {
if (err instanceof verror.VersionError) {
return sgp.changeSpec(fn);
} else {
throw err;
}
});
});
},
createOrJoin: function(spec) {
return sgp.create(spec)
.catch(function(err) {
if (err instanceof verror.ExistError) {
debug.log('Syncbase: syncgroup ' + name + ' already exists.');
return sgp.join();
} else {
throw err;
}
});
},
joinOrCreate: function(spec) {
return sgp.join()
.catch(function(err) {
//if (err instanceof verror.NoExistError) {
debug.log('Syncbase: syncgroup ' + name + ' does not exist.');
return sgp.createOrJoin(spec);
/*} else {
throw err;
}*/
});
}
};
return sgp;
},
/**
* @return {
* data,
* onChange*(key, ?value, continued),
* onUpdate*(data),
* onError*(err),
* onClose*(?err)
* }
*/
getWatchedObject: function(prefix) {
var result = {
data: {}
};
var onChange = defineClass.event(result, 'onChange');
var onUpdate = defineClass.event(result, 'onUpdate');
var onError = defineClass.event(result, 'onError');
var onClose = defineClass.event(result, 'onClose', 'memory');
function put(k, v) {
recursiveSet(result.data, k, v);
}
return this.getRawWatched(prefix, {
onData: put
}, {
onPut: function(k, v, continued) {
put(k, v);
onChange(k, v, continued);
},
onDelete: function(k, continued) {
recursiveDelete(result.data, k);
onChange(k, null, continued);
},
onBatchEnd: function() {
onUpdate(result.data);
},
onError: onError,
onClose: onClose
}).then(function() {
return result;
});
},
/**
* Pulls data from Syncbase and registers watch handlers. Returns a promise
* resolving after the initial pull.
*
* @param pullHandler { onData(key, value), onError(err) }
* @param streamHandler {
* ?onPut(key, value, continued),
* ?onDelete(key, continued),
* ?onBatchEnd(),
* onError(err),
* onClose(?err)
* }; These are callbacks rather than events to guarantee that no updates
* are missed. `continued` indicates whether a change is followed by more
* changes in same batch.
* @return a promise resolving after the initial pull completes. Watch
* callbacks may continue to be called until onClose.
*/
getRawWatched: function(prefix, pullHandler, streamHandler) {
var self = this;
var resumeMarker;
var opts = new syncbase.nosql.BatchOptions();
return self.runInBatch(self.context, self.db, opts, function(db, cb) {
Promise.all([
self.pull2(db, prefix, pullHandler.onData, pullHandler.onError),
promisify(db.getResumeMarker.bind(db))(self.context)
]).then(function(args) {
resumeMarker = args[1];
cb('abort');
}, cb);
}).catch(function(err) {
if (err !== 'abort') {
throw err;
}
var stream = self.db.watch(self.context, 't', joinKey(prefix),
resumeMarker, streamHandler.onClose);
stream.on('data', function(change) {
try {
switch(change.changeType) {
case 'put':
new Promise(function(resolve, reject) {
if (streamHandler.onPut) {
change.getValue(function(err, value) {
if (err) {
reject(err);
} else {
resolve(streamHandler.onPut(splitKey(change.rowName),
value, change.continued));
}
});
} else {
resolve();
}
}).then(function() {
if (!change.continued && streamHandler.onBatchEnd) {
return streamHandler.onBatchEnd();
}
}).catch(streamHandler.onError);
break;
case 'delete':
new Promise(function(resolve, reject) {
if (streamHandler.onDelete) {
resolve(streamHandler.onDelete(splitKey(change.rowName),
change.continued));
} else {
resolve();
}
}).then(function() {
if (!change.continued && streamHandler.onBatchEnd) {
return streamHandler.onBatchEnd();
}
}).catch(streamHandler.onError);
break;
default:
streamHandler.onError(
new Error('Invalid change type ' + change.changeType));
}
} catch(err) {
streamHandler.onError(err);
}
}).on('error', streamHandler.onError);
});
},
},
privates: {
/**
* TODO(rosswang): transitional: pull2 => pull
*
* Handlers may continue to be called even after the promise has been
* rejected.
*
* @param onData handler callback that takes a key, value pair.
* @param onError handler callback that takes an error.
* @return a promise that resolves when the pull is complete or rejects if
* the pull failed or either handler threw.
*/
pull2: function(db, prefix, onData, onError) {
var self = this;
return new Promise(function(resolve, reject) {
var isHeader = true;
var query = 'select k, v from t';
if (prefix) {
query += ' where k like "' + joinKey(prefix) + '%"';
}
db.exec(self.context, query, function(err) {
if (err) {
reject(err);
} else {
resolve();
}
}).on('data', function(row) {
if (isHeader) {
isHeader = false;
return;
}
try {
onData(splitKey(row[0]), row[1]);
} catch (err) {
reject(err);
}
}).on('error', function(err) {
if (!onError) {
reject(err);
} else {
try {
onError(err);
} catch (err2) {
reject(err2);
}
}
});
});
},
/* TODO(rosswang): Keep this around even though the dirty flag and write
* records are not used since watch integration, because there is still a
* potential race condition; I'm just not particularly sure how to deal with
* it yet. If it turns out we don't have to worry about it, delete this. */
manageWrite: function(promise) {
var writes = this.writes;
this.dirty = true;
writes.add(promise);
return promise.then(function(v) {
writes.delete(promise);
return v;
}, function(err) {
writes.delete(promise);
throw err;
});
},
standardPut: function(fn, k, v) {
k = joinKey(k);
return fn(this.context, k, v);
},
standardDelete: function(fn, k) {
k = joinKey(k);
debug.log('Syncbase: delete ' + k);
return fn(this.context, syncbase.nosql.rowrange.prefix(k));
}
},
constants: [ 'mountName' ],
// TODO(rosswang): transitional
events: {
onError: 'memory',
onUpdate: '',
},
// TODO(rosswang): end transitional
init: function(context, db, mountName) {
// TODO(rosswang): mountName probably won't be necessary after syncgroup
// admin instances are hosted (see group-manager).
var self = this;
this.context = context;
this.db = db;
this.t = db.table('t');
this.mountName = mountName;
this.writes = new Set();
this.runInBatch = promisify(syncbase.nosql.runInBatch);
this.putToSyncbase = promisify(this.t.put.bind(this.t));
this.deleteFromSyncbase = promisify(this.t.deleteRange.bind(this.t));
// TODO(rosswang): transitional
function watchLoop() {
if (!self.pull.current) {
self.refresh().catch(self.onError);
}
setTimeout(watchLoop, 500);
}
process.nextTick(watchLoop);
// TODO(rosswang): end transitional
}
});
module.exports = SyncbaseWrapper;