Support for schemas in syncbase js client.

Client changes were modeled after those in go/vcl/13040

Change-Id: I7887ea01390a395293cfef9f93f968cca4bf91e0
diff --git a/src/app.js b/src/app.js
index 3357eae..0b4c536 100644
--- a/src/app.js
+++ b/src/app.js
@@ -31,9 +31,10 @@
 }
 
 // noSqlDatabase returns the noSqlDatabase with the given name. relativeName
-// must not contain slashes.
-App.prototype.noSqlDatabase = function(relativeName) {
-  return new Database(this.fullName, relativeName);
+// must not contain slashes. schema can be null or undefined only if a schema
+// was never set for the database in the first place.
+App.prototype.noSqlDatabase = function(relativeName, schema) {
+  return new Database(this.fullName, relativeName, schema);
 };
 
 // listDatabases returns of all database names.
diff --git a/src/nosql/database.js b/src/nosql/database.js
index febf66f..a8fd7af 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -9,6 +9,7 @@
 // TODO(nlacasse): We should put unwrap and other type util methods on
 // vanadium.vdl object.
 var unwrap = require('vanadium/src/vdl/type-util').unwrap;
+var verror = vanadium.verror;
 
 var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
 var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
@@ -28,7 +29,6 @@
  * @param {string} relativeName Relative name of this Database.  Must not
  * contain slashes.
  * @param {module:syncbase.schema.Schema} schema Schema for the database.
- * TODO(nlacasse): port definition of Schema from go to javascript
  */
 function Database(parentFullName, relativeName, schema) {
   if (!(this instanceof Database)) {
@@ -37,8 +37,18 @@
 
   util.addNameProperties(this, parentFullName, relativeName);
 
-  this.schema = null;  // TODO(nlacasse): use schema from params
-  this.schemaVersion = -1;  // TODO(nlacasse): derive this from schema
+  Object.defineProperty(this, 'schema', {
+    enumerable: false,
+    value: schema,
+    writable: false
+  });
+
+  Object.defineProperty(this, 'schemaVersion', {
+    enumerable: false,
+    value: schema ? schema.metadata.version : -1,
+    writable: false
+  });
+
   /**
    * Caches the database wire object.
    * @private
@@ -70,13 +80,13 @@
  * @param {module:vanadium.security.access.Permissions} perms Permissions for
  * the new database.  If perms is null, we inherit (copy) the App perms.
  * @param {function} cb Callback.
- *
- * TODO(nlacasse): Port schema changes to javascript code. See cl:
- *    https://vanadium-review.googlesource.com/#/c/13040/ .
  */
 Database.prototype.create = function(ctx, perms, cb) {
-  //TODO(nlacasse): pass schema.metadata below instead of null
-  this._wire(ctx).create(ctx, null, perms, cb);
+  var schemaMetadata = null;
+  if (this.schema) {
+    schemaMetadata = this.schema.metadata;
+  }
+  this._wire(ctx).create(ctx, schemaMetadata, perms, cb);
 };
 
 /**
@@ -370,3 +380,110 @@
 Database.prototype.getSyncGroupNames = function(ctx, cb) {
   this._wire(ctx).getSyncGroupNames(ctx, cb);
 };
+
+/**
+ * Compares the current schema version of the database with the schema version
+ * provided while creating this database handle. If the current database schema
+ * version is lower, then schema.updater is called. If schema.updater is
+ * successful this method stores the new schema metadata in database.
+ *
+ * It is important not to access or modify the database until upgradeIfOutdated
+ * has called its callback.
+ *
+ * TODO(nlacasse): Consider locking the database in some way so that the
+ * upgrader function can access it, but all other attempts fail immediately
+ * with a helpful error.
+ *
+ * Note: schema can be nil, in which case this method skips schema check and
+ * the caller is responsible for maintaining schema sanity.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Database.prototype.upgradeIfOutdated = function(ctx, cb) {
+  var self = this;
+  if (!self.schema) {
+    return process.nextTick(function() {
+      cb(new verror.BadStateError(ctx,
+          'schema or schema.metadata cannot be nil.  ' +
+          'A valid schema needs to be used when creating Database handle.'));
+    });
+  }
+
+  if (self.schema.metadata.version < 0) {
+    return process.nextTick(function() {
+      cb(new verror.BadStateError(ctx,
+          'schema version cannot be less than zero'));
+    });
+  }
+
+  self._getSchemaMetadata(ctx, function(err, currMeta) {
+    if (err) {
+      if (!(err instanceof verror.NoExistError)) {
+        return cb(err);
+      }
+
+      // If the client app did not set a schema as part of create db
+      // getSchemaMetadata() will return a NoExistError. If so we set the
+      // schema here.
+      self._setSchemaMetadata(ctx, self.schema.metadata, function(err) {
+
+        // The database may not yet exist. If so above call will return
+        // NoExistError and we return db without error. If the error is
+        // different then return the error to the caller.
+        if (err && !(err instanceof verror.NoExistError)) {
+          return cb(err);
+        }
+        return cb(null, false);
+      });
+
+      return;
+    }
+
+    if (currMeta.version >= self.schema.metadata.version) {
+      return cb(null, false);
+    }
+
+    // Call the Upgrader provided by the app to upgrade the schema.
+    //
+    // TODO(nlacasse,jlodhia): disable sync before running Upgrader and
+    // reenable once Upgrader is finished.
+    //
+    // TODO(nlacasse,jlodhia): prevent other processes (local/remote) from
+    // accessing the database while upgrade is in progress.
+    self.schema.upgrader(self, currMeta.version, self.schema.metadata.version,
+        function(err) {
+      if (err) {
+        return cb(err);
+      }
+
+      // Update the schema metadata in db to the latest version.
+      self._setSchemaMetadata(ctx, self.schema.metadata, function(err) {
+        if (err) {
+          return cb(err);
+        }
+        cb(null, true);
+      });
+    });
+  });
+};
+
+/**
+ * Retrieves the schema metadata for the database.
+ * @private
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Database.prototype._getSchemaMetadata = function(ctx, cb) {
+  return this._wire(ctx).getSchemaMetadata(ctx, cb);
+};
+
+/**
+ * Stores the schema metadata for the database.
+ * @private
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {module:syncbase.schema.SchemaMetadata} metadata Schema metadata.
+ * @param {function} cb Callback.
+ */
+Database.prototype._setSchemaMetadata = function(ctx, metadata, cb) {
+  return this._wire(ctx).setSchemaMetadata(ctx, metadata, cb);
+};
diff --git a/src/nosql/index.js b/src/nosql/index.js
index 5bbab69..5136d9f 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -4,6 +4,7 @@
 
 var rowrange = require('./rowrange');
 var runInBatch = require('./batch');
+var Schema = require('./schema');
 var vdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
 var watch = require('./watch');
 
@@ -20,6 +21,8 @@
   ResumeMarker: watch.ResumeMarker,
   rowrange: rowrange,
   runInBatch: runInBatch,
+  Schema: Schema,
+  SchemaMetadata: vdl.SchemaMetadata,
   SyncGroupMemberInfo: vdl.SyncGroupMemberInfo,
   SyncGroupSpec: vdl.SyncGroupSpec,
   WatchChange: watch.WatchChange
diff --git a/src/nosql/schema.js b/src/nosql/schema.js
new file mode 100644
index 0000000..b98f77a
--- /dev/null
+++ b/src/nosql/schema.js
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = Schema;
+
+/**
+ * Each database has a Schema associated with it which defines the current
+ * version of the database. When a new version of app wishes to change its data
+ * in a way that it is not compatible with the old app's data, the app must
+ * change the schema version and provide relevant upgrade logic in the
+ * Upgrader. The conflict resolution rules are also associated with the schema
+ * version. Hence if the conflict resolution rules change then the schema
+ * version also must be bumped.
+ *
+ * Schema provides metadata and an upgrader for a given database.
+ *
+ * @constructor
+ * @param {module:syncbase.nosql.SchemaMetadata} metadata Schema metadata.
+ * @param {module:syncbase.nosql.Schema~upgrader} upgrader Upgrader function.
+ */
+function Schema(metadata, upgrader) {
+  Object.defineProperty(this, 'metadata', {
+    value: metadata,
+    writable: false,
+    enumerable: false
+  });
+
+  Object.defineProperty(this, 'upgrader', {
+    value: upgrader,
+    writable: false,
+    enumerable: false
+  });
+}
+
+/**
+ * Schema upgrader function.
+ * @callback module:syncbase.Schema~upgrader
+ * @param {module:syncbase.nosql.Database} db Database.
+ * @param {num} oldVersion Old version.
+ * @param {num} newVersion New version.
+ * @param {function} cb Callback to call when done.
+ */
diff --git a/test/integration/test-database.js b/test/integration/test-database.js
index 0182bfb..007c477 100644
--- a/test/integration/test-database.js
+++ b/test/integration/test-database.js
@@ -15,13 +15,18 @@
 var Database = require('../../src/nosql/database');
 var Table = require('../../src/nosql/table');
 
+var nosql = require('../..').nosql;
+var Schema = nosql.Schema;
+var SchemaMetadata = nosql.SchemaMetadata;
+
 var testUtil = require('./util');
 var setupApp = testUtil.setupApp;
 var setupDatabase = testUtil.setupDatabase;
 var setupTable = testUtil.setupTable;
 var uniqueName = testUtil.uniqueName;
 
-test('app.noSqlDatabase() returns a database', function(t) {
+test('app.noSqlDatabase(name) returns a database with correct name',
+     function(t) {
   setupApp(t, function(err, o) {
     if (err) {
       return t.end(err);
@@ -47,6 +52,30 @@
   });
 });
 
+test('app.noSqlDatabase(name, schema) returns a database with correct schema',
+     function(t) {
+  setupApp(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var version = 123;
+    var md = new SchemaMetadata({version: version});
+    var updater = function() {};
+    var schema = new Schema(md, updater);
+
+    var dbName = uniqueName('db');
+    var db = o.app.noSqlDatabase(dbName, schema);
+
+    t.ok(db, 'Database is constructed.');
+    t.ok(db instanceof Database, 'database is a Database object.');
+    t.equal(db.schema, schema, 'database has correct schema.');
+    t.equal(db.schemaVersion, version, 'database has correct schemaVersion');
+
+    o.teardown(t.end);
+  });
+});
+
 test('app.noSqlDatabase with slashes in the name', function(t) {
   setupApp(t, function(err, o) {
     if (err) {
diff --git a/test/integration/test-schema.js b/test/integration/test-schema.js
new file mode 100644
index 0000000..299777c
--- /dev/null
+++ b/test/integration/test-schema.js
@@ -0,0 +1,107 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var async = require('async');
+var test = require('prova');
+
+var testUtil = require('./util');
+var setupApp = testUtil.setupApp;
+var uniqueName = testUtil.uniqueName;
+
+var nosql = require('../..').nosql;
+var Schema = nosql.Schema;
+var SchemaMetadata = nosql.SchemaMetadata;
+
+test('schema check', function(t) {
+  setupApp(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var app = o.app;
+    var ctx = o.ctx;
+
+    var dbName = uniqueName('db');
+
+    var upgraderCallCount = 0;
+    var upgrader = function(db, oldVer, newVer, cb) {
+      upgraderCallCount++;
+      process.nextTick(function() {
+        cb(null);
+      });
+    };
+
+    var version = 123;
+    var md = new SchemaMetadata({version: version});
+    var schema = new Schema(md, upgrader);
+
+    var otherDb, otherSchema, newVersion;
+
+    var db = app.noSqlDatabase(dbName, schema);
+
+    async.waterfall([
+      // Verify that calling Upgrade on a non existing database does not throw
+      // errors.
+      db.upgradeIfOutdated.bind(db, ctx),
+      function(upgraded, cb) {
+        t.equal(upgraded, false, 'upgradeIfOutdated should return false');
+        t.equal(upgraderCallCount, 0,
+                'upgrader function should not have been called');
+        cb(null);
+      },
+
+      // Create db, this step also stores the schema provided above
+      db.create.bind(db, ctx, new Map()),
+      // Verify schema was stored as part of create.
+      function(cb) {
+        cb(null);
+      },
+
+      db._getSchemaMetadata.bind(db, ctx),
+
+      function(metadata, cb) {
+        t.equal(metadata.version, version, 'metadata has correct version');
+        cb(null);
+      },
+
+      // Make redundant call to Upgrade to verify that it is a no-op
+      db.upgradeIfOutdated.bind(db, ctx),
+      function(res, cb) {
+        t.notOk(res, 'upgradeIfOutdated should not return true');
+        t.equal(upgraderCallCount, 0,
+                'upgrader function should not have been called');
+        cb(null);
+      },
+
+      // Try to make a new database object for the same database but with an
+      // incremented schema version.
+      function(cb) {
+        newVersion = version + 1;
+        var otherMd = new SchemaMetadata({version: newVersion});
+        otherSchema = new Schema(otherMd, upgrader);
+        otherDb = app.noSqlDatabase(dbName, otherSchema);
+        otherDb.upgradeIfOutdated(ctx, cb);
+      },
+
+      function(res, cb) {
+        t.ok(res, 'otherDb.upgradeIfOutdated expected to return true');
+        t.equal(upgraderCallCount, 1, 'upgrader should have been called once');
+        cb(null);
+      },
+
+      // check if the contents of SchemaMetadata are correctly stored in the db.
+      function(cb) {
+        otherDb._getSchemaMetadata(ctx, cb);
+      },
+
+      function(metadata, cb) {
+        t.equal(metadata.version, newVersion, 'metadata has correct version');
+        cb(null);
+      }
+    ], function(err) {
+      t.error(err);
+      o.teardown(t.end);
+    });
+  });
+});