blob: d069336d1d8c2b887ec42e41941d57c681b46712 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
module.exports = Channel;
var _ = require('lodash');
var EventEmitter = require('events').EventEmitter;
var inherits = require('inherits');
var path = require('path');
var access = require('vanadium/src/gen-vdl/v.io/v23/security/access');
var naming = require('vanadium').naming;
var noop = require('./noop');
var ServiceVdl = require('./chat/vdl');
var util = require('./util');
// Member is a member of the channel.
function Member(blessings, path) {
// The member's blessings.
this.blessings = blessings;
// The name of the member.
this.name = util.firstShortName(blessings);
// The path at which the member is mounted in the mounttable.
this.path = path;
}
// memberNames takes an array of members and returns a sorted list of this
// names.
function memberNames(members) {
return _.map(members, function(member) {
return member.name;
}).sort();
}
// Channel encapsulates the logic for a client of the Vanadium Chat. It
// inherits from EventEmitter and emits 'members', 'message', and 'ready'
// events.
function Channel(rt, channelName) {
EventEmitter.call(this);
this.channelName_ = path.join('apps/chat', channelName);
this.accountName_ = rt.accountName;
this.namespace_ = rt.namespace();
this.context_ = rt.getContext();
this.client_ = rt.newClient();
this.server_ = rt.newServer();
this.ready_ = false;
this.members_ = [];
this.intervalID_ = null;
}
inherits(Channel, EventEmitter);
// join creates a Vanadium server and mounts it in the mounttable under a
// random "locked" name.
Channel.prototype.join = function(cb) {
cb = cb || noop;
// Create our service implementation, which defines a single method:
// "SendMessage". We inherit from ServiceVdl.Chat which causes our defined
// VDL types to be used when serializing values on the wire. This is
// necessary for the web client to be able to communicate with the shell
// client.
var Service = function() {
ServiceVdl.Chat.call(this);
};
inherits(Service, ServiceVdl.Chat);
// The implementation of sendMessage emits the message with the sender's
// name and timestamp.
Service.prototype.sendMessage = function(ctx, text) {
that.emit('message', {
sender: util.firstShortName(ctx.remoteBlessingStrings),
text: text,
timestamp: new Date()
});
};
// openAuthorizer allows RPCs from all clients.
var openAuthorizer = function(secCall, cb){ cb(null); };
var options = {authorizer: openAuthorizer};
var that = this;
// Get a locked name to mount under.
this.getLockedName_(function(err, name) {
if (err) {
return cb(err);
}
that.mountedName_ = name;
// Serve the chat service under the locked name.
// Note, serve() performs the mount() for us.
that.server_.serve(name, new Service(), options, function(err) {
if (err) return cb(err);
that.updateMembers_();
that.intervalID_ = setInterval(that.updateMembers_.bind(that), 2000);
return cb();
});
});
};
// getLockedName picks a random name and attempts to "lock" it by setting
// restrictive permissions. It tries repeatedly until it picks a name that is
// not locked by another client.
Channel.prototype.getLockedName_ = function(cb) {
// openACL gives everybody permission.
var openACL = new access.AccessList({
'in': ['...']
});
// myACL only gives my blessings and decendants permission.
var myACL = new access.AccessList({
'in': [this.accountName_]
});
// Create a tagged acl map with the desired permissions.
var tam = new access.Permissions(new Map([
// Give everybody the ability to read and resolve the name.
['Read', openACL],
['Resolve', openACL],
// All other permissions are only for us.
['Admin', myACL],
['Create', myACL],
['Mount', myACL]
]));
var that = this;
// Repeatedly pick random names and try to setPermissions on them until we get
// one that has not already been locked.
var maxRetries = 25;
function attemptToGetName(tries) {
if (tries >= maxRetries) {
return cb(new Error('Tried ' + maxRetries + ' to get an unlocked name ' +
'but did not succeed.'));
}
// Choose a random name under the channel name.
var name = path.join(that.channelName_, util.randomHex(32));
var ctx = that.context_.withTimeout(5000);
that.namespace_.setPermissions(ctx, name, tam, '', function(err) {
ctx.done();
if (err) {
// Try again.
return attemptToGetName(tries + 1);
}
return cb(null, name);
});
}
attemptToGetName(0);
};
// leave stops the chat server, and deletes our name from the mounttable.
Channel.prototype.leave = function(cb) {
cb = cb || noop;
// Stop updating member names.
if (this.intervalID_) {
clearInterval(this.intervalID_);
this.intervalID_ = null;
}
// Delete our name from the mounttable.
if (this.mountedName_) {
this.namespace_.delete(this.context_, this.mountedName_, true, noop);
}
// Stop the server.
this.server_.stop(cb);
};
// sendMessageTo sends a message to a particular member.
Channel.prototype.sendMessageTo = function(member, messageText, cb) {
cb = cb || noop;
// The allowedServersPolicy options require that the server matches the
// blessings we got when we globbed it.
var callOpts = this.client_.callOptions({
allowedServersPolicy: member.blessings
});
var ctx = this.context_.withTimeout(5000);
// Bind to the member's chat server.
this.client_.bindTo(ctx, member.path, function(err, s) {
if (err) {
ctx.done();
return cb(err);
}
// Invoke sendMessage on the member's chat server with messageText.
s.sendMessage(ctx, messageText, callOpts, function(err) {
if (err) {
return cb(err);
}
ctx.done();
return cb(null);
});
});
};
// broadcastMessage sends a message to all members in the channel.
Channel.prototype.broadcastMessage = function(messageText, cb) {
cb = cb || noop;
var that = this;
// Schedule all messages to be sent, then return immediately.
_.forEach(this.members_, function(member) {
process.nextTick(function() {
that.sendMessageTo(member, messageText);
});
});
return cb(null);
};
// updateMembers_ globs the mounttable and constructs member objects for each
// server in the glob results. Channel will emit 'members' event if the
// members have changed since the last glob.
Channel.prototype.updateMembers_ = function() {
var that = this;
// We glob on the channel name to find all the members in our channel. The
// glob will return a stream of mount entries corresponding to the paths where
// channel members are mounted. Then, we get the remote blessings from each
// member, and use those as the member names.
var ctx = this.context_.withTimeout(1000);
var pattern = path.join(this.channelName_, '*');
// Start the glob rpc. This returns a promise with a "stream" property.
// Mount entries are emitted on the stream.
var globRpc = this.namespace_.glob(ctx, pattern);
var globStream = globRpc.stream;
globRpc.catch(function(err) {
console.error(err);
});
var newMembers = [];
// Each time we get a mount entry, we construct a new Member object.
globStream.on('data', function(mountEntry) {
if (!mountEntry.servers || mountEntry.servers.length === 0) {
// No servers mounted at that name, only a lonely ACL. Safe to ignore.
return;
}
// TODO(ashankar,nlacasse): Check with p@ and figure out if
// mountEntry.servers[0].server can have a "suffix" after the address.
// (e.g., @4@...@@/foo/bar).
// This won't be the case for the chat application, but can be in general?
var addr = mountEntry.servers[0].server;
var blessings = naming.util.blessingNamesFromAddress(addr);
var path = mountEntry.name;
newMembers.push(new Member(blessings, path));
});
globStream.on('end', done);
function done(err) {
if (err) {
console.error(err);
}
if (newMembers.length === 0) {
// No glob results, not even us! Don't emit anything yet.
return;
}
// Get the member names for the old and new members and if they differ
// emit a "members" event which the UI will use to update the members
// list.
var oldMemberNames = memberNames(that.members_);
that.members_ = newMembers;
var newMemberNames = memberNames(that.members_);
if (!_.isEqual(oldMemberNames, newMemberNames)) {
that.emit('members', newMemberNames);
}
if (!that.ready_) {
that.ready_ = true;
that.emit('ready');
}
}
};