blob: ce2c35fea14a57fdeb24cf24da89132321a8a205 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
module.exports = Database;
var through2 = require('through2');
var vanadium = require('vanadium');
// TODO(nlacasse): We should put unwrap and other type util methods on
// vanadium.vdl object.
var unwrap = require('vanadium/src/vdl/type-util').unwrap;
var verror = vanadium.verror;
var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
var BatchDatabase = require('./batch-database');
/* jshint -W079 */
// Silence jshint's error about redefining 'Blob'.
var Blob = require('./blob');
/* jshint +W079 */
var SyncGroup = require('./syncgroup');
var Table = require('./table');
var util = require('../util');
var watch = require('./watch');
/**
* Database represents a collection of Tables. Batches, queries, sync, watch,
* etc. all operate at the Database level.
* @constructor
* @param {string} parentFullName Full name of App which contains this
* Database.
* @param {string} relativeName Relative name of this Database. Must not
* contain slashes.
* @param {module:syncbase.schema.Schema} schema Schema for the database.
*/
function Database(parentFullName, relativeName, schema) {
if (!(this instanceof Database)) {
return new Database(parentFullName, relativeName);
}
util.addNameProperties(this, parentFullName, relativeName);
Object.defineProperty(this, 'schema', {
enumerable: false,
value: schema,
writable: false
});
Object.defineProperty(this, 'schemaVersion', {
enumerable: false,
value: schema ? schema.metadata.version : -1,
writable: false
});
/**
* Caches the database wire object.
* @private
*/
Object.defineProperty(this, '_wireObj', {
enumerable: false,
value: null,
writable: true
});
}
/**
* @private
*/
Database.prototype._wire = function(ctx) {
if (this._wireObj) {
return this._wireObj;
}
var client = vanadium.runtimeForContext(ctx).newClient();
var signature = [nosqlVdl.Database.prototype._serviceDescription];
this._wireObj = client.bindWithSignature(this.fullName, signature);
return this._wireObj;
};
/**
* Creates this Database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:vanadium.security.access.Permissions} perms Permissions for
* the new database. If perms is null, we inherit (copy) the App perms.
* @param {function} cb Callback.
*/
Database.prototype.create = function(ctx, perms, cb) {
var schemaMetadata = null;
if (this.schema) {
schemaMetadata = this.schema.metadata;
}
this._wire(ctx).create(ctx, schemaMetadata, perms, cb);
};
/**
* Deletes this Database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.delete = function(ctx, cb) {
this._wire(ctx).delete(ctx, this.schemaVersion, cb);
};
/**
* Returns true only if this Database exists.
* Insufficient permissions cause exists to return false instead of an error.
* TODO(ivanpi): exists may fail with an error if higher levels of hierarchy
* do not exist.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.exists = function(ctx, cb) {
this._wire(ctx).exists(ctx, this.schemaVersion, cb);
};
/**
* Executes a syncQL query.
*
* Returns a stream of rows. The first row contains an array of headers (i.e.
* column names). Subsequent rows contain an array of values for each row that
* matches the query. The number of values returned in each row will match the
* size of the headers array.
* Concurrency semantics: It is legal to perform writes concurrently with
* Exec. The returned stream reads from a consistent snapshot taken at the
* time of the RPC, and will not reflect subsequent writes to keys not yet
* reached by the stream.
*
* NOTE(nlacasse): The Go client library returns the headers seperately from
* the stream. We could potentially do something similar in JavaScript, by
* pulling the headers off the stream and passing them to the callback.
* However, by Vanadium JS convention the callback gets called at the *end* of
* the RPC, so a developer would have to wait for the stream to finish before
* seeing what the headers are, which is not ideal. We also cannot return the
* headers directly because reading from the stream is async.
*
* TODO(nlacasse): Syncbase queries don't work on values that were put without
* type information. When JavaScript encodes values with no type infomation,
* it uses "vdl.Value" for the type. Presumably, syncbase does not know how to
* decode such objects, so queries that involve inspecting the object or its
* type don't work.
*
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {string} query Query string.
* @param {function} cb Callback.
* @returns {stream} Stream of rows.
*/
Database.prototype.exec = function(ctx, query, cb) {
var streamUnwrapper = through2({
objectMode: true
}, function(res, enc, cb) {
return cb(null, res.map(unwrap));
});
var stream = this._wire(ctx).exec(ctx, this.schemaVersion, query, cb).stream;
var decodedStream = stream.pipe(streamUnwrapper);
stream.on('error', function(err) {
decodedStream.emit('error', err);
});
return decodedStream;
};
/**
* Returns the Table with the given name.
* @param {string} relativeName Table name. Must not contain slashes.
* @return {module:syncbase.table.Table} Table object.
*/
Database.prototype.table = function(relativeName) {
return new Table(this.fullName, relativeName, this.schemaVersion);
};
/**
* Returns a list of all Table names.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.listTables = function(ctx, cb) {
util.getChildNames(ctx, this.fullName, cb);
};
/**
* @private
*/
Database.prototype._tableWire = function(ctx, relativeName) {
if (relativeName.indexOf('/') >= 0) {
throw new Error('relativeName must not contain slashes.');
}
var client = vanadium.runtimeForContext(ctx).newClient();
var signature = [nosqlVdl.Table.prototype._serviceDescription];
var fullTableName = vanadium.naming.join(this.fullName, relativeName);
return client.bindWithSignature(fullTableName, signature);
};
// TODO(nlacasse): It's strange that we create a Database with:
// var db = new Database();
// db.create();
// But we create a Table with:
// db.createTable();
// The .delete method is similarly confusing. db.delete deletes a database,
// but table.delete deletes a row (or row range).
// Consider puting all 'create' and 'delete' methods on the parent class for
// consistency.
// TODO(aghassemi): If we keep this, it should return "table" in the CB instead
// of being void.
/**
* Creates the specified Table.
* If perms is nil, we inherit (copy) the Database perms.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {string} relativeName Table name. Must not contain slashes.
* @param {module:vanadium.security.access.Permissions} perms Permissions for
* the new database. If perms is null, we inherit (copy) the Database perms.
* @param {function} cb Callback.
*/
Database.prototype.createTable = function(ctx, relativeName, perms, cb) {
this._tableWire(ctx, relativeName).create(ctx, this.schemaVersion, perms, cb);
};
/**
* Deletes the specified Table.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {string} relativeName Relative name of Table to delete. Must not
* contain slashes.
* @param {function} cb Callback.
*/
Database.prototype.deleteTable = function(ctx, relativeName, cb) {
this._tableWire(ctx, relativeName).delete(ctx, this.schemaVersion, cb);
};
/**
* Watches for updates to the database. For each watch request, the client will
* receive a reliable stream of watch events without re-ordering.
*
* This method is designed to be used in the following way:
* 1) begin a read-only batch
* 2) read all information your app needs
* 3) read the ResumeMarker
* 4) abort the batch
* 5) start watching for changes to the data using the ResumeMarker
*
* In this configuration the client doesn't miss any changes.
*
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {string} table Name of table to watch.
* @param {string} prefix Prefix of keys to watch.
* @param {module:syncbase.nosql.watch.ResumeMarker} resumeMarker ResumeMarker
* to resume watching from.
* @param {function} [cb] Optional callback that will be called after watch RPC
* finishes.
* @returns {stream} Stream of WatchChange objects.
*/
Database.prototype.watch = function(ctx, tableName, prefix, resumeMarker, cb) {
var globReq = new watchVdl.GlobRequest({
pattern: vanadium.naming.join(tableName, prefix + '*'),
resumeMarker: resumeMarker
});
var watchChangeEncoder = through2({
objectMode: true
}, function(change, enc, cb) {
var changeType;
switch (change.state) {
case watchVdl.Exists.val:
changeType = 'put';
break;
case watchVdl.DoesNotExist.val:
changeType = 'delete';
break;
default:
return cb(new Error('invalid change state ' + change.state));
}
var wc = new watch.WatchChange({
tableName: vanadium.naming.stripBasename(change.name),
rowName: vanadium.naming.basename(change.name),
changeType: changeType,
valueBytes: changeType === 'put' ? change.value.value : null,
resumeMarker: change.resumeMarker,
fromSync: change.value.fromSync,
continued: change.continued
});
return cb(null, wc);
});
var stream = this._wire(ctx).watchGlob(ctx, globReq, cb).stream;
var watchChangeStream = stream.pipe(watchChangeEncoder);
stream.on('error', function(err) {
watchChangeStream.emit('error', err);
});
return watchChangeStream;
};
/**
* Gets the ResumeMarker that points to the current end of the event log.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.getResumeMarker = function(ctx, cb) {
this._wire(ctx).getResumeMarker(ctx, cb);
};
/**
* Replaces the current Permissions for the Database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:vanadium.security.access.Permissions} perms Permissions for
* the database.
* @param {string} version Version of the current Permissions object which will
* be overwritten. If empty, SetPermissions will perform an unconditional
* update.
* @param {function} cb Callback.
*/
Database.prototype.setPermissions = function(ctx, perms, version, cb) {
this._wire(ctx).setPermissions(ctx, perms, version, cb);
};
/**
* Returns the current Permissions for the Database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.getPermissions = function(ctx, cb) {
this._wire(ctx).getPermissions(ctx, cb);
};
/**
* Creates a new batch. Instead of calling this function directly, clients are
* encouraged to use the RunInBatch() helper function, which detects "concurrent
* batch" errors and handles retries internally.
*
* Default concurrency semantics:
* - Reads (e.g. gets, scans) inside a batch operate over a consistent snapshot
* taken during beginBatch(), and will see the effects of prior writes
* performed inside the batch.
* - commit() may fail with errConcurrentBatch, indicating that after
* beginBatch() but before commit(), some concurrent routine wrote to a key
* that matches a key or row-range read inside this batch.
* - Other methods will never fail with error errConcurrentBatch, even if it is
* known that commit() will fail with this error.
*
* Once a batch has been committed or aborted, subsequent method calls will
* fail with no effect.
*
* Concurrency semantics can be configured using BatchOptions.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:vanadium.syncbase.nosql.BatchOptions} opts BatchOptions.
* @param {function} cb Callback.
*/
Database.prototype.beginBatch = function(ctx, opts, cb) {
var self = this;
this._wire(ctx).beginBatch(ctx, this.schemaVersion, opts,
function(err, relativeName) {
if (err) {
return cb(err);
}
// The relativeName returned from the beginBatch() call above is different
// than the relativeName of the current database. We must create a new
// Database with this new relativeName, and then create a BatchDatabase
// from that new Database.
var db = new Database(self._parentFullName, relativeName);
return cb(null, new BatchDatabase(db));
});
};
/**
* Gets a handle to the SyncGroup with the given name.
*
* @param {string} name SyncGroup name.
*/
Database.prototype.syncGroup = function(name) {
return new SyncGroup(this, name);
};
/**
* Gets the global names of all SyncGroups attached to this database.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.getSyncGroupNames = function(ctx, cb) {
this._wire(ctx).getSyncGroupNames(ctx, cb);
};
/**
* Compares the current schema version of the database with the schema version
* provided while creating this database handle. If the current database schema
* version is lower, then schema.updater is called. If schema.updater is
* successful this method stores the new schema metadata in database.
*
* It is important not to access or modify the database until upgradeIfOutdated
* has called its callback.
*
* TODO(nlacasse): Consider locking the database in some way so that the
* upgrader function can access it, but all other attempts fail immediately
* with a helpful error.
*
* Note: schema can be nil, in which case this method skips schema check and
* the caller is responsible for maintaining schema sanity.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype.upgradeIfOutdated = function(ctx, cb) {
var self = this;
if (!self.schema) {
return process.nextTick(function() {
cb(new verror.BadStateError(ctx,
'schema or schema.metadata cannot be nil. ' +
'A valid schema needs to be used when creating Database handle.'));
});
}
if (self.schema.metadata.version < 0) {
return process.nextTick(function() {
cb(new verror.BadStateError(ctx,
'schema version cannot be less than zero'));
});
}
self._getSchemaMetadata(ctx, function(err, currMeta) {
if (err) {
if (!(err instanceof verror.NoExistError)) {
return cb(err);
}
// If the client app did not set a schema as part of create db
// getSchemaMetadata() will return a NoExistError. If so we set the
// schema here.
self._setSchemaMetadata(ctx, self.schema.metadata, function(err) {
// The database may not yet exist. If so above call will return
// NoExistError and we return db without error. If the error is
// different then return the error to the caller.
if (err && !(err instanceof verror.NoExistError)) {
return cb(err);
}
return cb(null, false);
});
return;
}
if (currMeta.version >= self.schema.metadata.version) {
return cb(null, false);
}
// Call the Upgrader provided by the app to upgrade the schema.
//
// TODO(nlacasse,jlodhia): disable sync before running Upgrader and
// reenable once Upgrader is finished.
//
// TODO(nlacasse,jlodhia): prevent other processes (local/remote) from
// accessing the database while upgrade is in progress.
self.schema.upgrader(self, currMeta.version, self.schema.metadata.version,
function(err) {
if (err) {
return cb(err);
}
// Update the schema metadata in db to the latest version.
self._setSchemaMetadata(ctx, self.schema.metadata, function(err) {
if (err) {
return cb(err);
}
cb(null, true);
});
});
});
};
/**
* Retrieves the schema metadata for the database.
* @private
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*/
Database.prototype._getSchemaMetadata = function(ctx, cb) {
return this._wire(ctx).getSchemaMetadata(ctx, cb);
};
/**
* Stores the schema metadata for the database.
* @private
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {module:syncbase.schema.SchemaMetadata} metadata Schema metadata.
* @param {function} cb Callback.
*/
Database.prototype._setSchemaMetadata = function(ctx, metadata, cb) {
return this._wire(ctx).setSchemaMetadata(ctx, metadata, cb);
};
/**
* Returns a handle to the blob with the given blobRef.
* @param {module:syncbase.nosql.BlobRef} blobRef BlobRef of blob to get.
*
*/
Database.prototype.blob = function(blobRef) {
return new Blob(this, blobRef);
};
/**
* Creates a new blob.
* @param {module:vanadium.context.Context} ctx Vanadium context.
* @param {function} cb Callback.
*
*/
Database.prototype.createBlob = function(ctx, cb) {
var self = this;
this._wire(ctx).createBlob(ctx, function(err, blobRef) {
if (err) {
return cb(err);
}
return cb(null, new Blob(self, blobRef));
});
};