blob: 2f295117de4349decac3f6e0d368a13e0c75023a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
var assert = require('assert');
var debug = require('debug')('reader:vanadium');
var eos = require('end-of-stream');
var EventEmitter = require('events').EventEmitter;
var glob = require('./glob-stream');
var inherits = require('inherits');
var ms = require('ms');
var once = require('once');
var prr = require('prr');
var service = require('./service');
var syncbase = require('./syncbase');
var vanadium = require('vanadium');
var waterfall = require('run-waterfall');
var window = require('global/window');
module.exports = connect;
function connect(options) {
var client = new Client(options);
client.mount(function onmount(err) {
if (err) {
client.emit('error', err);
return;
}
debug('discovery is setup');
});
return client;
}
function Client(options) {
if (!(this instanceof Client)) {
return new Client(options);
}
debug('instantiating: %o', options);
var client = this;
EventEmitter.call(client);
client.id = options.id;
client.name = '';
client.mounted = false;
client.runtime = {};
client.service = service(client);
// TODO(jasoncampbell): Come up with a better way to couple the bare service
// instance with the client's methods.
client.on('service:announce', client.connect.bind(client));
client.once('runtime', function onruntime(runtime) {
var options = {
runtime: runtime,
name: client.name.replace(/\/app$/, '/syncbase')
};
var store = syncbase(options);
prr(client, 'syncbase', store);
client.emit('syncbase', store);
debug('runtime is available');
});
}
inherits(Client, EventEmitter);
Client.prototype.mount = function(callback) {
var client = this;
var workers = [
client.init.bind(client),
client.serve.bind(client),
client.glob.bind(client)
];
waterfall(workers, function done(err, params) {
if (err) {
return callback(err);
}
client.mounted = true;
callback();
});
};
Client.prototype.init = function(callback) {
var client = this;
vanadium.init({
appName: 'reader',
namespaceRoots: [ '/ns.dev.v.io:8101' ],
}, onruntime);
function onruntime(err, runtime) {
if (err) {
return callback(err);
}
// TODO(jasoncampbell): When this happens the window really, really needs to
// be reloaded. In order to safely reload the page state should be stored or
// serialized in a way that makes it recoverable for this error case.
runtime.on('crash', function oncrash(err) {
debug('runtime crash: %s', err.stack);
client.emit('error', err);
});
client.runtime = runtime;
// <prefix>/reader/:id
// <prefix>/reader/:id/syncbase
// <prefix>/reader/syncgroup
var name = getName(runtime, client.id);
client.name = name;
client.emit('runtime', runtime);
return callback(null, runtime);
}
};
Client.prototype.serve = function(runtime, callback) {
var client = this;
var service = client.service;
debug('serve: %s', client.name);
runtime.newServer(client.name, service, onserve);
function onserve(err, server) {
if (err) {
return callback(err);
}
window.addEventListener = window.addEventListener || noop;
window.addEventListener('beforeunload', beforeunload);
callback(null, runtime);
function beforeunload() {
debug('closing Vanadium runtime');
var namespace = runtime.getNamespace();
var context = runtime.getContext();
// TODO(jasoncampbell): Inspect wether or not these methods actually have
// time to finish executing, possibly run them in parallel with a callback
// that fires an alert to test...
namespace.delete(context, client.name, true, noop);
server.stop(noop);
}
}
};
// Globs until mounted.
Client.prototype.glob = function(runtime, callback) {
callback = once(callback);
var client = this;
// Glob pattern based on "<prefix>/reader/:id/app"
var pattern = prefix(runtime).replace('/chrome', '') + '/reader/*/app';
var stream = glob({
name: client.name,
runtime: runtime,
pattern: pattern,
timeout: ms('12s')
});
// Glob stream ends once the mounted client.name has been discovered or there
// is an error.
//
// NOTE: Streaming names through service connection has been commented out,
// this will will be addressed later as the previous client/service model is
// unessecary given the way syncbase is being leveraged.
//
// TODO(jasoncampbell): Hook up a simple discovery mechanism to detect new
// peers and share information about constellation state like disconnects.
eos(stream, { readable: false }, callback);
// stream
// .pipe(filter(peers))
// .pipe(through(write))
// .on('error', function(err) {
// debug('peer-stream error: %s', err.stack);
// client.emit('error', err);
// });
//
// function write(buffer, enc, cb) {
// var name = buffer.toString();
//
// client.connect(name);
//
// if (name === client.name) {
// onmount();
// }
//
// cb(null, buffer);
// }
};
Client.prototype.connect = function(name) {
assert.ok(name, 'name is required');
var client = this;
var peers = client.peers;
var exists = peers.get(name);
var isSelf = name === client.name;
// No need to connect to the local service.
if (!exists && isSelf) {
peers.put(name, {
status: 'self'
});
}
// No need to connect if the peer is known.
if (peers.get(name)) {
return;
}
debug('connecting to peer: %s', name);
peers.put(name, {
status: 'connecting'
});
var runtime = client.runtime;
var vclient = runtime.getClient();
var context = runtime.getContext();
vclient.bindTo(context, name, function onremote(err, remote) {
if (err) {
// NOTE: It is possible the name is a stale entry in the mount table, if
// that is the case this error and the onremote callback will take a while
// to complete (upwards of 45 seconds).
//
// It might be better for each peer to track this on thier own...
if (err.id === 'v.io/v23/verror.NoServers') {
debug('stale mounttable entry: %s', name);
// TODO(jasoncampbell): Come up with a strategy to alert other peers
// about stale state of this peer...
peers.put(name, {
status: 'stale'
});
// Do some cleanup and remove the stale entry so other peers don't have
// to deal with this error case.
runtime.getNamespace().delete(context, name, true, noop);
// Remove the local stale reference if the client is mounted. This
// prevents re-connect from being attempted when the glob stream is
// active.
if (client.mounted) {
peers.delete(name);
}
return;
} else {
client.emit('error', err);
return;
}
}
// TODO(jasoncampbell): This should happen in an interval so that changes in
// the remote's state can be detected early instead of assuming it will work
// at a later time...
remote.announce(context, client.name, function(err, response) {
if (err) {
debug('announce errored: %s', err.stack);
// TODO(jasoncampbell): Come up with a strategy to alert other peers
// about stale state of this peer...
client.emit('error', err);
return;
}
debug('announced to "%s" - %s', name, response);
peers.put(name, {
status: 'connected',
remote: remote
});
});
});
};
Client.prototype.remotes = function(status, mapper) {
var client = this;
var peers = client.peers();
var keys = Object.keys(peers);
var length = keys.length;
var tasks = [];
for (var i = 0; i < length; i++) {
var peer = peers[keys[i]];
if (peer.status === status) {
var value = mapper ? mapper(peer) : peer;
tasks.push(value);
}
}
return tasks;
};
// TODO(jasoncampbell): Move naming related code into a separate module.
function getName(runtime, id) {
var p = prefix(runtime).replace('/chrome', '');
return [
p,
'reader',
id,
'app'
].join('/');
}
// Helper function to return a mountable prefix name from a runtime
function prefix(runtime) {
return runtime.accountName.replace(/^dev.v.io\/u\//, 'users/');
}
function noop() {}