blob: 97451fe9269ed99b4b607f5c462858b3a90234c0 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
var vanadium = require('vanadium');
var mercury = require('mercury');
var LRU = require('lru-cache');
var EventEmitter = require('events').EventEmitter;
var vanadiumConfig = require('../../vanadium-config');
var itemFactory = require('./item');
var freeze = require('../../lib/mercury/freeze');
var sortedPush = require('../../lib/mercury/sorted-push-array');
var log = require('../../lib/log')('services:namespace:service');
var naming = vanadium.naming;
naming.parseName = parseName;
module.exports = {
getChildren: getChildren,
getNamespaceItem: getNamespaceItem,
getRemoteBlessings: getRemoteBlessings,
getSignature: getSignature,
getAccountName: getAccountName,
getObjectAddresses: getObjectAddresses,
getPermissions: getPermissions,
resolveToMounttable: resolveToMounttable,
makeRPC: makeRPC,
search: search,
util: naming,
initVanadium: getRuntime,
clearCache: clearCache,
deleteMountPoint: deleteMountPoint,
prefixes: prefixes
};
//TODO(aghassemi) What's a good timeout? It should be shorter than this.
//Can we use ML to dynamically change the timeout?
//Should this be a user settings?
var RPC_TIMEOUT = 15 * 1000;
/*
* Lazy getter and initializer for Vanadium runtime
*/
var _runtimePromiseInstance;
function getRuntime() {
if (!_runtimePromiseInstance) {
_runtimePromiseInstance = vanadium.init(vanadiumConfig);
}
return _runtimePromiseInstance;
}
/*
* Returns the accountName for the currently logged in user
* @return {Promise.<string>}
*/
function getAccountName() {
return getRuntime().then(function(rt) {
return rt.accountName;
});
}
/*
* globCache holds (name + globQuery, result) cache entry for
* GLOB_CACHE_MAX_SIZE items in an LRU cache
*/
var GLOB_CACHE_MAX_SIZE = 10000;
var GLOB_CACHE_PREFIX = 'glob|';
var globCache = new LRU({
max: GLOB_CACHE_MAX_SIZE
});
/*
* Given a name and a glob query returns a promise of an observable array
* of items as defined in @see item.js
* As new items become available the observable array will change to reflect
* the changes.
*
* The observable result has an events property which is an EventEmitter
* and emits 'end' and 'globError' events.
*
* @param {string} pattern Glob pattern
* @return {Promise.<mercury.array>} Promise of an observable array
* of namespace items
*/
function glob(pattern) {
var cacheKey = GLOB_CACHE_PREFIX + pattern;
var cacheHit = globCache.get(cacheKey);
if (cacheHit) {
// The addition of the end event to mark the end of a glob requires that
// our cache also causes the same event to be emitted.
if (cacheHit._hasEnded) {
// Remove old listeners to avoid memory leak but now we need to set
// _hasEnded to false until we trigger again otherwise a new request
// could wipe out an outstanding listener.
cacheHit._hasEnded = false;
cacheHit.events.removeAllListeners();
process.nextTick(function() {
cacheHit.events.emit('end');
cacheHit._hasEnded = true;
});
}
return Promise.resolve(cacheHit);
}
var globItemsObservArr = mercury.array([]);
var immutableResult = freeze(globItemsObservArr);
immutableResult.events = new EventEmitter();
var ctx;
var globItemsObservArrPromise =
getRuntime().then(function callGlobOnNamespace(rt) {
ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
// TODO(aghassemi) use watchGlob when available
var namespace = rt.getNamespace();
return namespace.glob(ctx, pattern).stream;
}).then(function updateResult(globStream) {
globStream.on('data', function createItem(globResult) {
// Create an item as glob results come in and add the item to result
var item = createNamespaceItem(globResult);
var existingItem = globItemsObservArr.filter(function(curItem) {
return curItem().objectName === item().objectName;
}).get(0);
if (existingItem) {
// override the old one if new item has server
if (item().hasServer) {
var index = globItemsObservArr.indexOf(existingItem);
globItemsObservArr.put(index, item);
}
} else {
var sorter = 'mountedName';
sortedPush(globItemsObservArr, item, sorter);
}
});
globStream.on('end', function() {
immutableResult.events.emit('end');
immutableResult._hasEnded = true;
});
globStream.on('error', function emitGlobErrorAndLog(err) {
immutableResult.events.emit('globError', err);
log.warn('Glob stream error for', name, err);
});
}).then(function cacheAndReturnResult() {
globCache.set(cacheKey, immutableResult);
return immutableResult;
}).catch(function invalidateCacheAndRethrow(err) {
globCache.del(cacheKey);
return Promise.reject(err);
});
// Return our Promise of observable array. It will get filled as data comes in
return globItemsObservArrPromise;
}
/*
* Given a name, provide information about a the name as defined in @see item.js
* @param {string} objectName Object name to get namespace item for.
* @return {Promise.<mercury.value>} Promise of an observable value of an item
* as defined in @see item.js
*/
function getNamespaceItem(objectName) {
// Globbing the name itself would provide information about the name.
return glob(objectName).then(function(resultsObs) {
// Wait until the glob finishes before returning the item
return new Promise(function(resolve, reject) {
resultsObs.events.on('end', function() {
var results = resultsObs();
if (results.length === 0) {
reject(new Error(objectName + ' Not Found'));
} else {
resolve(resultsObs.get(0));
}
});
});
});
}
/*
* Given a name returns a promise of an observable array of immediate children
* @param {string} parentName Object name to glob
* @return {Promise.<mercury.array>} Promise of an observable array
*/
function getChildren(parentName) {
parentName = parentName || '';
var pattern = '*';
if (parentName) {
pattern = naming.join(parentName, pattern);
}
return glob(pattern);
}
/*
* Given a name and a glob search query returns a promise of an observable array
* of items as defined in @see item.js
* As new items become available the observable array will change to reflect
* the changes.
* @param {name} parentName Object name to search in.
* @param {string} pattern Glob search pattern.
* @return {Promise.<mercury.array>} Promise of an observable array
* of namespace items
*/
function search(parentName, pattern) {
parentName = parentName || '';
if (parentName) {
pattern = naming.join(parentName, pattern);
}
return glob(pattern);
}
/*
* Given a name, provide information about its mount point permissions.
* @param {string} objectName Object name to get permissions for.
* @return {Promise.<mercury.value<vanadium.security.Permissions>>} Promise of a
* vanadium.security.Permissions object.
*/
function getPermissions(name) {
return getRuntime().then(function(rt) {
var ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
var ns = rt.getNamespace();
return ns.getPermissions(ctx, name);
}).then(function(results) {
// getPermissions return multiple results, permissions is at
// outArg position 0
return mercury.value(results[0]);
});
}
/*
* Deletes a mount point.
* @param {string} name mountpoint name to delete.
* @return {Promise<void>} Success or failure promise.
*/
function deleteMountPoint(name) {
return getRuntime().then(function(rt) {
var ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
var ns = rt.getNamespace();
return ns.delete(ctx, name, true);
});
}
/*
* Given a name, provide information about its mounttable objectAddress.
* @param {string} objectName Object name to get mounttable objectAddress for.
* @return {Promise.<mercury.array<string>>} Promise of an array of
* objectAddress strings.
*/
function resolveToMounttable(name) {
return getRuntime().then(function(rt) {
var ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
var ns = rt.getNamespace();
return ns.resolveToMounttable(ctx, name);
}).then(function(objectAddresses) {
return mercury.array(objectAddresses);
});
}
/*
* Given a name, provide information about its objectAddresses.
* @param {string} objectName Object name to get objectAddresses for.
* @return {Promise.<mercury.array<string>>} Promise of an observable value an
* array of string objectAddresses
*/
function getObjectAddresses(name) {
return getRuntime().then(function resolve(rt) {
var resolveCtx = rt.getContext().withTimeout(RPC_TIMEOUT);
var ns = rt.getNamespace();
return ns.resolve(resolveCtx, name);
}).then(function(objectAddresses) {
return mercury.array(objectAddresses);
});
}
/*
* remoteBlessingsCache holds (name, []string) cache entry for
* REMOTE_BLESSINGS_CACHE_MAX_SIZE items in an LRU cache
*/
var REMOTE_BLESSINGS_CACHE_MAX_SIZE = 10000;
var REMOTE_BLESSINGS_PREFIX = 'getRemoteBlessings|';
var remoteBlessingsCache = new LRU({
max: REMOTE_BLESSINGS_CACHE_MAX_SIZE
});
/*
* Given an object name, returns a promise of the service's remote blessings.
* @param {string} objectName Object name to get remote blessings for
* @return {[]string} remoteBlessings The service's remote blessings.
*/
function getRemoteBlessings(objectName) {
var cacheKey = REMOTE_BLESSINGS_PREFIX + objectName;
var cacheHit = remoteBlessingsCache.get(cacheKey);
if (cacheHit) {
return Promise.resolve(cacheHit);
}
return getRuntime().then(function invokeRemoteBlessingsMethod(rt) {
var ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
var client = rt.getClient();
return client.remoteBlessings(ctx, objectName);
}).then(function cacheAndReturnRemoteBlessings(remoteBlessings) {
// Remote Blessings is []string representing the principals of the service.
remoteBlessingsCache.set(cacheKey, remoteBlessings);
return remoteBlessings;
});
}
/*
* signatureCache holds (name, signature) cache entry for
* SIGNATURE_CACHE_MAX_SIZE items in an LRU cache
*/
var SIGNATURE_CACHE_MAX_SIZE = 10000;
var SIGNATURE_CACHE_PREFIX = 'getSignature|';
var signatureCache = new LRU({
max: SIGNATURE_CACHE_MAX_SIZE
});
/*
* Given a object name, returns a promise of the signature of methods available
* on the object represented by that name.
* @param {string} objectName Object name to get signature for
* @return {signature} signature for the object represented by the given name
*/
function getSignature(objectName) {
var cacheKey = SIGNATURE_CACHE_PREFIX + objectName;
var cacheHit = signatureCache.get(cacheKey);
if (cacheHit) {
return Promise.resolve(cacheHit);
}
return getRuntime().then(function invokeSignatureMethod(rt) {
var ctx = rt.getContext().withTimeout(RPC_TIMEOUT);
var client = rt.getClient();
return client.signature(ctx, objectName);
}).then(function cacheAndReturnSignature(signature) {
// Signature is []interface; each interface contains method data.
signatureCache.set(cacheKey, signature);
return signature;
});
}
/*
* Make an RPC call on a service object.
* name: string representing the name of the service
* methodName: string for the service method name
* args (optional): array of arguments for the service method
*/
function makeRPC(name, methodName, args) {
// Adapt the method name to be lowercase again.
methodName = methodName[0].toLowerCase() + methodName.substr(1);
var ctx;
return getRuntime().then(function bindToName(rt) {
ctx = rt.getContext();
var client = rt.getClient();
return client.bindTo(ctx, name);
}).then(function callMethod(service) {
log.debug('Calling', methodName, 'on', name, 'with', args);
args.unshift(ctx.withTimeout(RPC_TIMEOUT));
return service[methodName].apply(null, args);
}).then(function returnResult(result) {
// If the result was for 0 outArg, then this returns undefined.
// If the result was for 1 outArg, then it gets a single output.
// If the result was for >1 outArgs, then we return []output.
return result;
});
}
/*
* Creates an observable struct representing basic information about
* an item in the namespace.
* @see item.js
* @param {MountEntry} mountEntry The mount entry from glob results.
* @return {Mercury.struct} observable struct representing basic information
* about an item.
*/
function createNamespaceItem(mountEntry) {
var name = mountEntry.name;
// mounted name relative to parent
var mountedName = naming.basename(name);
var isLeaf = mountEntry.isLeaf;
var hasServer = mountEntry.servers.length > 0 ||
!mountEntry.servesMountTable;
var hasMountPoint = mountEntry.servers.length > 0 ||
mountEntry.servesMountTable;
var isMounttable = mountEntry.servers.length > 0 &&
mountEntry.servesMountTable;
var item = itemFactory.createItem({
objectName: name,
mountedName: mountedName,
isLeaf: isLeaf,
hasServer: hasServer,
hasMountPoint: hasMountPoint,
isMounttable: isMounttable
});
return item;
}
/*
* Given an arbitrary Vanadium name, parses it into an array
* of strings.
* For example, if name is "/ns.dev.v.io:8101/global/rps"
* returns ["ns.dev.v.io:8101", "global", "rps"]
* Can use namespaceService.util.isRooted to see if the name
* is rooted (begins with a slash).
* Note that the address part can contain slashes.
*/
function parseName(name) {
var splitName = naming.splitAddressName(name);
var namespaceParts = [];
if (splitName.address) {
namespaceParts.push(splitName.address);
}
if (splitName.suffix) {
var suffixParts = splitName.suffix.split('/');
namespaceParts = namespaceParts.concat(suffixParts);
}
return namespaceParts;
}
/*
* Clears all caches (glob, signature, etc...) for the given name and all of its
* descendants.
* @param {string} parentName Name to clear caches for. If no name is given, all
* caches are cleared.
*/
function clearCache(parentName) {
if (!parentName) {
globCache.reset();
remoteBlessingsCache.reset();
signatureCache.reset();
return;
}
clearByPrefix(globCache, GLOB_CACHE_PREFIX + parentName);
clearByPrefix(remoteBlessingsCache, REMOTE_BLESSINGS_PREFIX + parentName);
clearByPrefix(signatureCache, SIGNATURE_CACHE_PREFIX + parentName);
function clearByPrefix(cache, parent) {
var keys = cache.keys();
keys.forEach(function(key) {
if (prefixes(parent, key)) {
cache.del(key);
}
});
}
}
/*
* Returns true iff parentName is a parent of childName or is same as childName
*/
function prefixes(parentName, childName) {
return (parentName === childName) ||
(childName.indexOf(naming.clean(parentName) + '/') === 0);
}