blob: 5385fc8c11e94cc40adc570ca5aea9dcb12df20c [file] [log] [blame]
var veyron = require('veyron');
var mercury = require('mercury');
var LRU = require('lru-cache');
var EventEmitter = require('events').EventEmitter;
var jsonStableStringify = require('json-stable-stringify');
var namespaceUtil = veyron.namespaceUtil;
var veyronConfig = require('../../veyron-config');
var itemFactory = require('./item');
var freeze = require('../../lib/mercury/freeze');
var adaptSignature = require('./signature-adapter');
var log = require('../../lib/log')('services:namespace:service');
module.exports = {
getChildren: getChildren,
getNamespaceItem: getNamespaceItem,
getSignature: getSignature,
getAccountName: getAccountName,
hashSignature: hashSignature,
makeRPC: makeRPC,
search: search,
util: namespaceUtil,
initVanadium: getRuntime
};
//TODO(aghassemi) What's a good timeout? It should be shorter than this.
//Can we use ML to dynamically change the timeout?
//Should this be a user settings?
var RPC_TIMEOUT = 30 * 1000;
/*
* Lazy getter and initializer for Veyron runtime
*/
var _runtimePromiseInstance;
function getRuntime() {
if (!_runtimePromiseInstance) {
_runtimePromiseInstance = veyron.init(veyronConfig);
}
return _runtimePromiseInstance;
}
/*
* Returns the accountName for the currently logged in user of Viz
* @return {Promise.<string>}
*/
function getAccountName() {
return getRuntime().then(function(rt) {
return rt.accountName;
});
}
/*
* globCache holds (name + globQuery, result) cache entry for
* GLOB_CACHE_MAX_SIZE items in an LRU cache
*/
var GLOB_CACHE_MAX_SIZE = 100;
var globCache = new LRU({
max: GLOB_CACHE_MAX_SIZE
});
/*
* Given a name and a glob query returns a promise of an observable array
* of items as defined in @see item.js
* As new items become available the observable array will change to reflect
* the changes.
*
* The observable result has an events property which is an EventEmitter
* and emits 'end' and 'streamError' events.
*
* @param {string} pattern Glob pattern
* @return {Promise.<mercury.array>} Promise of an observable array
* of namespace items
*/
function glob(pattern) {
var cacheKey = 'glob|' + pattern;
var cacheHit = globCache.get(cacheKey);
if (cacheHit) {
// The addition of the end event to mark the end of a glob requires that
// our cache also causes the same event to be emitted.
if (cacheHit._hasEnded) {
// Remove old listeners to avoid memory leak but now we need to set
// _hasEnded to false until we trigger again otherwise a new request
// could wipe out an outstanding listener.
cacheHit._hasEnded = false;
cacheHit.events.removeAllListeners();
process.nextTick(function() {
cacheHit.events.emit('end');
cacheHit._hasEnded = true;
});
}
return Promise.resolve(cacheHit);
}
var ctx = veyron.context.Context().withTimeout(RPC_TIMEOUT);
var globItemsObservArr = mercury.array([]);
var immutableResult = freeze(globItemsObservArr);
immutableResult.events = new EventEmitter();
var globItemsObservArrPromise =
getRuntime().then(function callGlobOnNamespace(rt) {
// TODO(aghassemi) use watchGlob when available
var namespace = rt.namespace();
return namespace.glob(ctx, pattern).stream;
}).then(function updateResult(globStream) {
var itemPromises = [];
globStream.on('data', function createItem(globResult) {
// Create an item as glob results come in and add the item to result
var result = createNamespaceItem(globResult);
var item = result.item;
var onFinishPromise = result.onFinish;
// TODO(aghassemi) namespace glob can return duplicate results, this
// temporary fix keeps the one that's a server. Is this correct?
// If a name can be more than one thing, UI needs change too.
var existingItem = globItemsObservArr.filter(function(curItem) {
return curItem().objectName === item().objectName;
}).get(0);
if (existingItem) {
// override the old one if new item is a server
if (item().isServer) {
var index = globItemsObservArr.indexOf(existingItem);
globItemsObservArr.put(index, item);
}
} else {
globItemsObservArr.push(item);
}
itemPromises.push(onFinishPromise);
});
globStream.on('end', function() {
var triggerEnd = function() {
immutableResult.events.emit('end');
immutableResult._hasEnded = true;
};
// Wait until all createItem promises return before triggering has ended
Promise.all(itemPromises).then(triggerEnd).catch(triggerEnd);
});
globStream.on('error', function invalidateCacheAndLog(err) {
globCache.del(cacheKey);
immutableResult.events.emit('streamError', err);
log.error('Glob stream error for', name, err);
});
}).then(function cacheAndReturnResult() {
globCache.set(cacheKey, immutableResult);
return immutableResult;
}).catch(function invalidateCacheAndRethrow(err) {
globCache.del(cacheKey);
return Promise.reject(err);
});
// Return our Promise of observable array. It will get filled as data comes in
return globItemsObservArrPromise;
}
/*
* Given a name, provide information about a the name as defined in @see item.js
* @param {string} objectName Object name to get namespace item for.
* @return {Promise.<mercury.value>} Promise of an observable value of an item
* as defined in @see item.js
*/
function getNamespaceItem(objectName) {
// Globbing the name itself would provide information about the name.
return glob(objectName).then(function(resultsObs) {
// Wait until the glob finishes before returning the item
return new Promise(function(resolve, reject) {
resultsObs.events.on('streamError', function(err) {
reject(err);
});
resultsObs.events.on('end', function() {
var results = resultsObs();
if (results.length === 0) {
reject(new Error(objectName + ' Not Found'));
} else {
var item = new mercury.value(results[0]);
var immutableItem = freeze(item);
resolve(immutableItem);
}
});
});
});
}
/*
* Given a name returns a promise of an observable array of immediate children
* @param {string} parentName Object name to glob
* @return {Promise.<mercury.array>} Promise of an observable array
*/
function getChildren(parentName) {
parentName = parentName || '';
var pattern = '*';
if (parentName) {
pattern = namespaceUtil.join(parentName, pattern);
}
return glob(pattern);
}
/*
* Given a name and a glob search query returns a promise of an observable array
* of items as defined in @see item.js
* As new items become available the observable array will change to reflect
* the changes.
* @param {name} parentName Object name to search in.
* @param {string} pattern Glob search pattern.
* @return {Promise.<mercury.array>} Promise of an observable array
* of namespace items
*/
function search(parentName, pattern) {
parentName = parentName || '';
if (parentName) {
pattern = namespaceUtil.join(parentName, pattern);
}
return glob(pattern);
}
/*
* signatureCache holds (name, signature) cache entry for
* SIGNATURECACHE_MAX_SIZE items in an LRU cache
*/
var SIGNATURE_CACHE_MAX_SIZE = 10000;
var signatureCache = new LRU({
max: SIGNATURE_CACHE_MAX_SIZE
});
/*
* Given a object name, returns a promise of the signature of methods available
* on the object represented by that name.
* @param {string} objectName Object name to get signature for
* @return {object} signature for the object represented by the given name
*/
function getSignature(objectName) {
var cacheKey = 'getSignature|' + objectName;
var cacheHit = signatureCache.get(cacheKey);
if (cacheHit) {
return Promise.resolve(cacheHit);
}
return getRuntime().then(function invokeSignatureMethod(rt) {
var ctx = veyron.context.Context().withTimeout(RPC_TIMEOUT);
var client = rt.newClient();
return client.signature(ctx, objectName);
}).then(function cacheAndReturnSignature(signatures) {
var adaptedSignature = adaptSignature(signatures);
signatureCache.set(cacheKey, adaptedSignature);
return adaptedSignature;
});
}
/*
* Given a service signature, compute a reasonable hash that uniquely identifies
* a service without containing unnecessary information.
* TODO(alexfandrianto): This heuristic comes close, but it does not properly
* distinguish services from each other.
* The adapted signature now has type info, streaming info, interface name, etc.
*/
function hashSignature(adaptedSignature) {
var cp = [];
adaptedSignature.forEach(function(method, methodName) {
cp[methodName] = method.inArgs.length;
});
return jsonStableStringify(cp);
}
/*
* TODO(aghassemi) Technically right now every server is globbable
* so our definition of globbable is whether the server in question
* has any children.
* We may want to consider exposing some metadata about a service on
* whether that service actually implements Glob or GetChildren
* interfaces in a custom way or not.
*
* Given a object name, returns whether the service referenced by the name
* supports globbing.
* @param {string} objectName Object name to check to see if globbale
* @return {boolean} Whether the service is globbable
*/
function isGlobbable(objectName) {
return getChildren(objectName).then(function(obs) {
return new Promise(function(resolve, reject) {
var onEndListener = function() {
// no children
resolve(false);
removeWatch();
};
// resolve as soon as we find one child
var removeWatch = mercury.watch(obs, function(children) {
if (children.length > 0) {
resolve(true);
}
});
obs.events.once('end', onEndListener);
});
}).catch(function() {
return false;
});
}
/*
* Make an RPC call on a service object.
* name: string representing the name of the service
* methodName: string for the service method name
* args (optional): array of arguments for the service method
*/
function makeRPC(name, methodName, args) {
return getRuntime().then(function bindToName(rt) {
var ctx = veyron.context.Context().withTimeout(RPC_TIMEOUT);
return rt.bindTo(ctx, name);
}).then(function callMethod(service) {
log.debug('Calling', methodName, 'on', name, 'with', args);
var ctx = veyron.context.Context().withTimeout(RPC_TIMEOUT);
args.unshift(ctx);
return service[methodName].apply(null, args);
}).then(function returnResult(result) {
return result;
});
}
/*
* Creates an observable struct representing basic information about
* an item in the namespace.
* @param {string} name The full hierarchical object name of the item e.g.
* "bar/baz/foo"
* @param {MountEntry} mountEntry The mount entry from glob results.
* @param {Array<string>} List of server addresses this name points to, if any.
* @return item: {mercury.struct} onFinish: {Promise<bool>} Promise indicating
* we have loaded all the information including the async ones for the item.
*/
function createNamespaceItem(mountEntry) {
var name = mountEntry.name;
// mounted name relative to parent
var mountedName = namespaceUtil.basename(name);
var servers = mountEntry.servers;
// get server related information.
var isServer = servers.length > 0;
var serverInfo = null;
if (isServer) {
serverInfo = getServerInfo(name, mountEntry);
}
var item = itemFactory.createItem({
objectName: name,
mountedName: mountedName,
isGlobbable: false,
isServer: isServer,
serverInfo: serverInfo
});
// find out if the object referenced by name is globbable asynchronously and
// update the state when it comes back
var onFinishPromise = isGlobbable(name).then(function(isGlobbable) {
item.isGlobbable.set(isGlobbable);
return true;
}).catch(function() {
return true;
});
return {
item: item,
onFinish: onFinishPromise
};
}
/*
* Creates an observable struct representing information about a server such as
* type information
* @see item.js for details.
* @param {string} objectName Object name to get serverInfo for.
* @param {MountEntry} mountEntry mount entry to item to get serverInfo for.
* @return {mercury.struct}
*/
function getServerInfo(objectName, mountEntry) {
var typeInfo = getServerTypeInfo(mountEntry);
var endpoints = getEndpoints(mountEntry);
var serverInfo = itemFactory.createServerInfo({
typeInfo: typeInfo,
endpoints: endpoints
});
return serverInfo;
}
/*
* Creates an observable struct representing information about a server type.
* For example if a server is known to be a mounttable or a store, the struct
* would have information such as a key, human readable name and description for
* the type of server.
* @see item.js for details.
* @param {MountEntry} mountEntry mount entry to get serverTypeInfo for.
* @return {mercury.struct}
*/
function getServerTypeInfo(mountEntry) {
// Currently we only support detecting mounttables which is
// based on a "MT" flag that comes from the Glob API. Mounttables are special
// in a sense that we fundamentally "know" they are a mounttable.
// Later when we extend the support for other services, we need to do
// either duck typing and have a special __meta route that provides metadata
// information about a service.
var isMounttable = mountEntry.mT;
if (isMounttable) {
return itemFactory.createServerTypeInfo({
key: 'veyron-mounttable',
typeName: 'Mount Table',
description: 'Mount table service allows registration ' +
'and resolution of object names.'
});
} else {
return createUnknownServiceTypeInfo();
}
}
/**
* Creates an observable array with the endpoints of the mountEntry.
* @param {MountEntry} mountEntry mount entry with server endpoints.
* @return {mercury.array} Mercury array containing the endpoints.
*/
function getEndpoints(mountEntry) {
// Convert the endpoints into a mercury list.
return mercury.array(
mountEntry.servers.map(function(endpoint) {
return mercury.value(endpoint.server);
})
);
}
function createUnknownServiceTypeInfo() {
return itemFactory.createServerTypeInfo({
key: 'veyron-unknown',
typeName: 'Service',
description: null
});
}