syncbase.js: Support for exec queries.

Change-Id: Ibdc4a20edfd2887d9979b03afdca64bd3ff0e318
diff --git a/package.json b/package.json
index 397ea95..df391fb 100644
--- a/package.json
+++ b/package.json
@@ -11,7 +11,9 @@
   "devDependencies": {
     "async": "~1.0.0",
     "deep-equal": "~1.0.0",
+    "format": "~0.2.1",
     "jshint": "~2.7.0",
+    "json-stable-stringify": "~1.0.0",
     "minimist": "~1.1.1",
     "mkdirp": "~0.5.1",
     "prova": "aghassemi/prova#0.0.4",
diff --git a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
index 494d9d6..18bc481 100644
--- a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
@@ -24,6 +24,7 @@
 var _type2 = new vdl.Type();
 var _type3 = new vdl.Type();
 var _type4 = new vdl.Type();
+var _type5 = new vdl.Type();
 var _typeBatchOptions = new vdl.Type();
 var _typeKeyValue = new vdl.Type();
 var _typePrefixPermissions = new vdl.Type();
@@ -38,16 +39,19 @@
 _type2.key = vdl.types.STRING;
 _type3.kind = vdl.kind.LIST;
 _type3.name = "";
-_type3.elem = vdl.types.BYTE;
+_type3.elem = vdl.types.ANY;
 _type4.kind = vdl.kind.LIST;
 _type4.name = "";
-_type4.elem = _typePrefixPermissions;
+_type4.elem = vdl.types.BYTE;
+_type5.kind = vdl.kind.LIST;
+_type5.name = "";
+_type5.elem = _typePrefixPermissions;
 _typeBatchOptions.kind = vdl.kind.STRUCT;
 _typeBatchOptions.name = "v.io/syncbase/v23/services/syncbase/nosql.BatchOptions";
 _typeBatchOptions.fields = [{name: "Hint", type: vdl.types.STRING}, {name: "ReadOnly", type: vdl.types.BOOL}];
 _typeKeyValue.kind = vdl.kind.STRUCT;
 _typeKeyValue.name = "v.io/syncbase/v23/services/syncbase/nosql.KeyValue";
-_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type3}];
+_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type4}];
 _typePrefixPermissions.kind = vdl.kind.STRUCT;
 _typePrefixPermissions.name = "v.io/syncbase/v23/services/syncbase/nosql.PrefixPermissions";
 _typePrefixPermissions.fields = [{name: "Prefix", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}];
@@ -61,6 +65,7 @@
 _type2.freeze();
 _type3.freeze();
 _type4.freeze();
+_type5.freeze();
 _typeBatchOptions.freeze();
 _typeKeyValue.freeze();
 _typePrefixPermissions.freeze();
@@ -387,6 +392,11 @@
 };
     
       
+Database.prototype.exec = function(ctx, serverCall, query) {
+  throw new Error('Method Exec not implemented');
+};
+    
+      
 Database.prototype.abort = function(ctx, serverCall) {
   throw new Error('Method Abort not implemented');
 };
@@ -526,6 +536,26 @@
     
       
     {
+    name: 'Exec',
+    doc: "// Exec executes a syncQL query and returns all results as specified by\n// in the query's select clause.  The returned stream reads\n// from a consistent snapshot taken at the time of the Exec RPC.",
+    inArgs: [{
+      name: 'query',
+      doc: "",
+      type: vdl.types.STRING
+    },
+    ],
+    outArgs: [],
+    inStream: null,
+    outStream: {
+      name: '',
+      doc: '',
+      type: _type3
+    },
+    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
     name: 'Abort',
     doc: "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
     inArgs: [],
@@ -858,12 +888,12 @@
     inArgs: [{
       name: 'start',
       doc: "",
-      type: _type3
+      type: _type4
     },
     {
       name: 'limit',
       doc: "",
-      type: _type3
+      type: _type4
     },
     ],
     outArgs: [],
@@ -879,12 +909,12 @@
     inArgs: [{
       name: 'start',
       doc: "",
-      type: _type3
+      type: _type4
     },
     {
       name: 'limit',
       doc: "",
-      type: _type3
+      type: _type4
     },
     ],
     outArgs: [],
@@ -931,7 +961,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: _type4
+      type: _type5
     },
     ],
     inStream: null,
@@ -996,7 +1026,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: _type3
+      type: _type4
     },
     ],
     inStream: null,
@@ -1011,7 +1041,7 @@
     inArgs: [{
       name: 'value',
       doc: "",
-      type: _type3
+      type: _type4
     },
     ],
     outArgs: [],
diff --git a/src/nosql/batch-database.js b/src/nosql/batch-database.js
index 5f1b130..7fac2d6 100644
--- a/src/nosql/batch-database.js
+++ b/src/nosql/batch-database.js
@@ -64,3 +64,20 @@
 BatchDatabase.prototype.abort = function(ctx, cb) {
   this._db._wire(ctx).abort(ctx, cb);
 };
+
+/**
+ * Executes a syncQL query.
+ *
+ * Returns a stream of rows.  The first row contains an array of headers (i.e.
+ * column names).  Subsequent rows contain an array of values for each row that
+ * matches the query.  The number of values returned in each row will match the
+ * size of the headers array.
+ *
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {string} query Query string.
+ * @param {function} cb Callback.
+ * @returns {stream} Stream of rows.
+ */
+BatchDatabase.prototype.exec = function(ctx, query, cb) {
+  this._db.exec(ctx, query, cb);
+};
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 1329135..85b44e9 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -4,7 +4,11 @@
 
 module.exports = Database;
 
+var through2 = require('through2');
 var vanadium = require('vanadium');
+// TODO(nlacasse): We should put unwrap and other type util methods on
+// vanadium.vdl object.
+var unwrap = require('vanadium/src/vdl/type-util').unwrap;
 
 var BatchDatabase = require('./batch-database');
 var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
@@ -74,6 +78,50 @@
 };
 
 /**
+ * Executes a syncQL query.
+ *
+ * Returns a stream of rows.  The first row contains an array of headers (i.e.
+ * column names).  Subsequent rows contain an array of values for each row that
+ * matches the query.  The number of values returned in each row will match the
+ * size of the headers array.
+ *
+ * NOTE(nlacasse): The Go client library returns the headers seperately from
+ * the stream.  We could potentially do something similar in JavaScript, by
+ * pulling the headers off the stream and passing them to the callback.
+ * However, by Vanadium JS convention the callback gets called at the *end* of
+ * the RPC, so a developer would have to wait for the stream to finish before
+ * seeing what the headers are, which is not ideal.  We also cannot return the
+ * headers directly because reading from the stream is async.
+ *
+ * TODO(nlacasse): Syncbase queries don't work on values that were put without
+ * type information.  When JavaScript encodes values with no type infomation,
+ * it uses "vdl.Value" for the type.  Presumably, syncbase does not know how to
+ * decode such objects, so queries that involve inspecting the object or its
+ * type don't work.
+ *
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {string} query Query string.
+ * @param {function} cb Callback.
+ * @returns {stream} Stream of rows.
+ */
+Database.prototype.exec = function(ctx, query, cb) {
+  var streamUnwrapper = through2({
+    objectMode: true
+  }, function(res, enc, cb) {
+    return cb(null, res.map(unwrap));
+  });
+
+  var stream = this._wire(ctx).exec(ctx, query, cb).stream;
+
+  var decodedStream = stream.pipe(streamUnwrapper);
+  stream.on('error', function(err) {
+    decodedStream.emit('error', err);
+  });
+
+  return decodedStream;
+};
+
+/**
  * Returns the Table with the given name.
  * @param {string} relativeName Table name.  Must not contain slashes.
  * @return {module:syncbase.table.Table} Table object.
diff --git a/src/nosql/row.js b/src/nosql/row.js
index ed94e2a..a748c7d 100644
--- a/src/nosql/row.js
+++ b/src/nosql/row.js
@@ -92,14 +92,20 @@
  * Writes the given value for this Row.
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {*} value Value to write.
+ * @param {module:vanadium.vdl.Type} [type] Type of value.
  * @param {function} cb Callback.
  */
-Row.prototype.put = function(ctx, value, cb) {
+Row.prototype.put = function(ctx, value, type, cb) {
+  if (typeof cb === 'undefined' && typeof type === 'function') {
+    cb = type;
+    type = undefined;
+  }
+
   // NOTE(aghassemi) Currently server side does not want to encode for
   // performance reasons, so encoding/decoding is happening on the client side.
   var encodedVal;
   try {
-    encodedVal = vanadium.vom.encode(value);
+    encodedVal = vanadium.vom.encode(value, type);
   } catch (e) {
     return cb(e);
   }
@@ -113,4 +119,4 @@
  */
 Row.prototype.delete = function(ctx, cb) {
   this._wire(ctx).delete(ctx, cb);
-};
\ No newline at end of file
+};
diff --git a/src/nosql/table.js b/src/nosql/table.js
index fd44896..8e46169 100644
--- a/src/nosql/table.js
+++ b/src/nosql/table.js
@@ -80,13 +80,19 @@
 /**
  * Put writes the given value to this Table. The value's primary key field
  * must be set.
+ *
+ * Note that if you want to sync data with a Go syncbase client, or if you want
+ * to use syncbase queries, you must either specify the type of the value, or
+ * use a vdl value that includes its type.
+ *
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {string} key Primary key of the row.
  * @param {*} value Value to put in the row.
+ * @param {module:vanadium.vdl.Type} [type] Type of value.
  * @param {function} cb Callback.
  */
-Table.prototype.put = function(ctx, key, value, cb) {
-  this.row(key).put(ctx, value, cb);
+Table.prototype.put = function(ctx, key, value, type, cb) {
+  this.row(key).put(ctx, value, type, cb);
 };
 
 /**
@@ -107,6 +113,7 @@
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {module:syncbase.nosql.rowrange.RowRange} range Row ranges to scan.
  * @param {function} cb Callback.
+ * @returns {stream} Stream of row objects.
  */
 Table.prototype.scan = function(ctx, range, cb) {
   var vomStreamDecoder = through2({
diff --git a/test/integration/test-database.js b/test/integration/test-database.js
index ad3b4c2..9bba2ee 100644
--- a/test/integration/test-database.js
+++ b/test/integration/test-database.js
@@ -2,9 +2,15 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-var naming = require('vanadium').naming;
+var async = require('async');
+var format = require('format');
+var stringify = require('json-stable-stringify');
 var test = require('prova');
+var toArray = require('stream-to-array');
+
 var vanadium = require('vanadium');
+var naming = vanadium.naming;
+var vdl = vanadium.vdl;
 
 var Database = require('../../src/nosql/database');
 var Table = require('../../src/nosql/table');
@@ -14,6 +20,7 @@
 var tableExists = testUtil.tableExists;
 var setupApp = testUtil.setupApp;
 var setupDatabase = testUtil.setupDatabase;
+var setupTable = testUtil.setupTable;
 var uniqueName = testUtil.uniqueName;
 
 test('app.noSqlDatabase() returns a database', function(t) {
@@ -252,3 +259,247 @@
     });
   });
 });
+
+test('database.exec', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    var personType = new vdl.Type({
+      kind: vdl.kind.STRUCT,
+      name: 'personType',
+      fields: [
+        {
+          name: 'first',
+          type: vdl.types.STRING
+        },
+        {
+          name: 'last',
+          type: vdl.types.STRING
+        },
+        {
+          name: 'employed',
+          type: vdl.types.BOOL
+        },
+        {
+          name: 'age',
+          type: vdl.types.INT32
+        }
+      ]
+    });
+
+    var homer = {
+      first: 'Homer',
+      last: 'Simpson',
+      employed: true,
+      age: 38
+    };
+
+    var bart = {
+      first: 'Bart',
+      last: 'Simpson',
+      employed: false,
+      age: 10
+    };
+
+    var maggie = {
+      first: 'Maggie',
+      last: 'Simpson',
+      employed: false,
+      age: 1
+    };
+
+    var moe = {
+      first: 'Moe',
+      last: 'Syzlak',
+      employed: true,
+      age: 46
+    };
+
+    var people = [homer, bart, maggie, moe];
+
+    var cityType = new vdl.Type({
+      kind: vdl.kind.STRUCT,
+      name: 'cityType',
+      fields: [
+        {
+          name: 'name',
+          type: vdl.types.STRING
+        },
+        {
+          name: 'population',
+          type: vdl.types.INT32
+        },
+        {
+          name: 'age',
+          type: vdl.types.INT32
+        }
+      ]
+    });
+
+    var springfield = {
+      name: 'Springfield',
+      population: 30720,
+      age: 219
+    };
+
+    var shelbyville = {
+      name: 'Shelbyville',
+      population: 600000,
+      age: 220
+    };
+
+    var cities = [springfield, shelbyville];
+
+    var testCases = [
+      {
+        q: 'select k, v from %s',
+        want: [
+          ['k', 'v'],
+          ['Homer', homer],
+          ['Bart', bart],
+          ['Moe', moe],
+          ['Maggie', maggie],
+          ['Springfield', springfield],
+          ['Shelbyville', shelbyville]
+        ]
+      },
+      {
+        q: 'select k, v.Age from %s',
+        want: [
+          ['k', 'v.Age'],
+          ['Homer', homer.age],
+          ['Bart', bart.age],
+          ['Moe', moe.age],
+          ['Maggie', maggie.age],
+          ['Springfield', springfield.age],
+          ['Shelbyville', shelbyville.age]
+        ]
+      },
+      {
+        q: 'select k, v.First from %s where t = "personType"',
+        want: [
+          ['k', 'v.First'],
+          ['Homer', homer.first],
+          ['Bart', bart.first],
+          ['Moe', moe.first],
+          ['Maggie', maggie.first]
+        ]
+      },
+      {
+        q: 'select k, v.Population from %s where t = "cityType"',
+        want: [
+          ['k', 'v.Population'],
+          ['Shelbyville', shelbyville.population],
+          ['Springfield', springfield.population],
+        ]
+      },
+      {
+        q: 'select k, v from %s where v.Age = 10',
+        want: [
+          ['k', 'v'],
+          ['Bart', bart]
+        ]
+      },
+      {
+        q: 'select k, v from %s where k = "Homer"',
+        want: [
+          ['k', 'v'],
+          ['Homer', homer],
+        ]
+      },
+      {
+        // Note the double-percent below. The query is passed through 'format'
+        // to insert the table name. The double %% will be replaced with a
+        // single %.
+        q: 'select k, v from %s where k like "M%%"',
+        want: [
+          ['k', 'v'],
+          ['Moe', moe],
+          ['Maggie', maggie],
+        ]
+      },
+      {
+        q: 'select k, v from %s where v.Employed = true',
+        want: [
+          ['k', 'v'],
+          ['Homer', homer],
+          ['Moe', moe],
+        ]
+      },
+    ];
+
+    putPeople();
+
+    // Put all people, keyed by their first name.
+    function putPeople() {
+      async.forEach(people, function(person, cb) {
+        table.put(ctx, person.first, person, personType, cb);
+      }, putCities);
+    }
+
+    // Put all cities, keyed by their name.
+    function putCities(err) {
+      if (err) {
+        return end(err);
+      }
+
+      async.forEach(cities, function(city, cb) {
+        table.put(ctx, city.name, city, cityType, cb);
+      }, runTestCases);
+    }
+
+    // Check all the test cases.
+    function runTestCases(err) {
+      if (err) {
+        return end(err);
+      }
+
+      async.forEachSeries(testCases, function(testCase, cb) {
+        assertExec(format(testCase.q, table.name), testCase.want, cb);
+      }, end);
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+
+    // Assert that query 'q' returns the rows in 'want'.
+    function assertExec(q, want, cb) {
+      var stream = db.exec(ctx, q, function(err) {
+        t.error(err);
+        cb();
+      });
+      stream.on('error', t.error);
+      toArray(stream, function(err, got) {
+        t.error(err);
+        got.sort(arrayCompare);
+        want.sort(arrayCompare);
+
+        var msg = 'query: "' + q + '" returns the correct values';
+        t.deepEqual(got, want, msg);
+      });
+    }
+  });
+});
+
+// Compare two arrays by json-encoding all items, then joining and treating it
+// as string.  Used to sort an array of arrays deterministically.
+function arrayCompare(a1, a2) {
+  var a1s = a1.map(stringify).join('/');
+  var a2s = a2.map(stringify).join('/');
+
+  if (a1s <= a2s) {
+    return -1;
+  }
+  if (a1s >= a2s) {
+    return 1;
+  }
+  return 0;
+}