Client support for Blobs.

Change-Id: I702dd561991fb3adfed1d2543672c96cbd3381cf
diff --git a/package.json b/package.json
index df391fb..4a86cd2 100644
--- a/package.json
+++ b/package.json
@@ -18,6 +18,7 @@
     "mkdirp": "~0.5.1",
     "prova": "aghassemi/prova#0.0.4",
     "run-parallel": "~1.1.1",
+    "stream-array": "~1.1.0",
     "stream-to-array": "~2.0.2",
     "tap-xunit": "~1.1.1",
     "which": "~1.1.1",
diff --git a/src/app.js b/src/app.js
index 0b4c536..ba39037 100644
--- a/src/app.js
+++ b/src/app.js
@@ -19,6 +19,10 @@
 
   util.addNameProperties(this, parentFullName, relativeName);
 
+  // TODO(nlacasse): Use the prr module to simplify all the
+  // 'Object.defineProperty' calls scattered throughout the project.
+  // https://www.npmjs.com/package/prr
+
   /**
    * Caches the database wire object.
    * @private
diff --git a/src/nosql/blob.js b/src/nosql/blob.js
new file mode 100644
index 0000000..c406165
--- /dev/null
+++ b/src/nosql/blob.js
@@ -0,0 +1,122 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = Blob;
+
+function Blob(db, blobRef) {
+  if (!(this instanceof Blob)) {
+    return new Blob(db, blobRef);
+  }
+
+  /**
+   * @private
+   */
+  Object.defineProperty(this, '_db', {
+    enumerable: false,
+    value: db,
+    writable: false
+  });
+
+  /**
+   * @property ref
+   * @type {string}
+   */
+  Object.defineProperty(this, 'ref', {
+    enumerable: true,
+    value: blobRef,
+    writable: false
+  });
+}
+
+/**
+ * Appends the byte stream to the blob.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ * @returns {Stream<Uint8Array>} Stream of bytes to append to blob.
+ */
+Blob.prototype.put = function(ctx, cb) {
+  return this._db._wire(ctx).putBlob(ctx, this.ref, cb).stream;
+};
+
+/**
+ * Marks the blob as immutable.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.commit = function(ctx, cb) {
+  this._db._wire(ctx).commitBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Gets the count of bytes written as part of the blob (committed or
+ * uncommitted).
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.size = function(ctx, cb) {
+  this._db._wire(ctx).getBlobSize(ctx, this.ref, cb);
+};
+
+/**
+ * Locally deletes the blob (committed or uncommitted).
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.delete = function(ctx, cb) {
+  this._db._wire(ctx).deleteBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Returns the byte stream from a committed blob starting at offset.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} offset Offset in bytes.
+ * @param {function} cb Callback.
+ * @returns {Stream<Uint8Array>} Stream of blob bytes.
+ */
+Blob.prototype.get = function(ctx, offset, cb) {
+  return this._db._wire(ctx).getBlob(ctx, this.ref, offset, cb).stream;
+};
+
+/**
+ * Initiates fetching a blob if not locally found, priority controls the
+ * network priority of the blob.  Higher priority blobs are fetched before the
+ * lower priority ones.  However an ongoing blob transfer is not interrupted.
+ * Status updates are streamed back to the client as fetch is in progress.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} priority Priority.
+ * @param {function} cb Callback.
+ * @returns {Stream<BlobFetchStatus>} Stream of blob statuses.
+ */
+Blob.prototype.fetch = function(ctx, priority, cb) {
+  return this._db._wire(ctx).fetchBlob(ctx, this.ref, priority, cb).stream;
+};
+
+/**
+ * Locally pins the blob so that it is not evicted.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.pin = function(ctx, cb) {
+  this._db._wire(ctx).pinBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Locally unpins the blob so that it can be evicted if needed.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.unpin = function(ctx, cb) {
+  this._db._wire(ctx).unpinBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Locally caches the blob with the specified rank.  Lower ranked blobs are
+ * more eagerly evicted.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} rank Rank of blob.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.keep = function(ctx, rank, cb) {
+  this._db._wire(ctx).keepBlob(ctx, this.ref, rank, cb);
+};
diff --git a/src/nosql/database.js b/src/nosql/database.js
index a8fd7af..ce2c35f 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -15,6 +15,10 @@
 var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
 
 var BatchDatabase = require('./batch-database');
+/* jshint -W079 */
+// Silence jshint's error about redefining 'Blob'.
+var Blob = require('./blob');
+/* jshint +W079 */
 var SyncGroup = require('./syncgroup');
 var Table = require('./table');
 var util = require('../util');
@@ -487,3 +491,29 @@
 Database.prototype._setSchemaMetadata = function(ctx, metadata, cb) {
   return this._wire(ctx).setSchemaMetadata(ctx, metadata, cb);
 };
+
+/**
+ * Returns a handle to the blob with the given blobRef.
+ * @param {module:syncbase.nosql.BlobRef} blobRef BlobRef of blob to get.
+ *
+ */
+Database.prototype.blob = function(blobRef) {
+  return new Blob(this, blobRef);
+};
+
+/**
+ * Creates a new blob.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ *
+ */
+Database.prototype.createBlob = function(ctx, cb) {
+  var self = this;
+  this._wire(ctx).createBlob(ctx, function(err, blobRef) {
+    if (err) {
+      return cb(err);
+    }
+    return cb(null, new Blob(self, blobRef));
+  });
+};
+
diff --git a/src/nosql/index.js b/src/nosql/index.js
index 5136d9f..469e1ed 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -17,6 +17,7 @@
  */
 module.exports = {
   BatchOptions: vdl.BatchOptions,
+  BlobRef: vdl.BlobRef,
   ReadOnlyBatchError: vdl.ReadOnlyBatchError,
   ResumeMarker: watch.ResumeMarker,
   rowrange: rowrange,
diff --git a/test/integration/test-blob.js b/test/integration/test-blob.js
new file mode 100644
index 0000000..d1aee34
--- /dev/null
+++ b/test/integration/test-blob.js
@@ -0,0 +1,150 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var arrayToStream = require('stream-array');
+var streamToArray = require('stream-to-array');
+var test = require('prova');
+
+/* jshint -W079 */
+// Silence jshint's error about redefining 'Blob'.
+var Blob = require('../../src/nosql/blob');
+/* jshint +W079 */
+var vdl =
+  require('../../src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+
+var testUtil = require('./util');
+var setupDatabase = testUtil.setupDatabase;
+var uniqueName = testUtil.uniqueName;
+
+test('db.blob returns the correct blob', function(t) {
+  setupDatabase(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var blobRef = uniqueName('blobRef');
+    var blob = o.database.blob(blobRef);
+
+    t.ok(blob instanceof Blob);
+    t.equals(blob.ref, blobRef);
+
+    o.teardown(t.end);
+  });
+});
+
+// Tests local blob get before and after a put.
+test('blob put then get', function(t) {
+  setupDatabase(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+
+    var blob;
+
+    var data = new Uint8Array(new ArrayBuffer(256));
+
+    db.createBlob(ctx, function(err, _blob) {
+      if (err) {
+        return end(err);
+      }
+
+      blob = _blob;
+      t.ok(blob instanceof Blob, 'createBlob returns a new blob');
+      t.equals(typeof blob.ref, 'string', 'blob has blobRef');
+
+      getEmptyBlob();
+    });
+
+    function getEmptyBlob() {
+      var blobStream = blob.get(ctx, 0, function(err) {
+        t.ok(err instanceof vdl.BlobNotCommittedError,
+             'blob.get should fail for uncommitted blobs');
+      });
+
+      streamToArray(blobStream, function(err) {
+        t.ok(err instanceof vdl.BlobNotCommittedError,
+             'blob.get should fail for uncommitted blobs');
+        fetchEmptyBlob();
+      });
+    }
+
+    function fetchEmptyBlob() {
+      var blobStatusStream = blob.fetch(ctx, 100, function(err) {
+        t.ok(err instanceof vdl.BlobNotCommittedError,
+             'blob.fetch should fail for uncommitted blobs');
+      });
+
+      streamToArray(blobStatusStream, function(err) {
+        t.ok(err instanceof vdl.BlobNotCommittedError,
+             'blob status stream should fail for uncommitted blobs');
+        assertBlobIsEmpty();
+      });
+    }
+
+    function assertBlobIsEmpty() {
+      blob.size(ctx, function(err, size) {
+        if (err) {
+          return end(err);
+        }
+        t.equals(size.toNativeNumber(), 0, 'blob is empty');
+
+        putToBlob();
+      });
+    }
+
+    function putToBlob() {
+      var byteStream = blob.put(ctx, function(err) {
+        if (err) {
+          return t.end(err);
+        }
+
+        assertBlobSize();
+      });
+
+      arrayToStream([data]).pipe(byteStream);
+    }
+
+    function assertBlobSize() {
+      blob.size(ctx, function(err, size) {
+        if (err) {
+          return end(err);
+        }
+        t.equals(size.toNativeNumber(), data.length, 'blob has correct size');
+
+        commitBlob();
+      });
+    }
+
+    function commitBlob() {
+      blob.commit(ctx, function(err) {
+        if (err) {
+          return end(err);
+        }
+
+        assertGetBlob();
+      });
+    }
+
+    function assertGetBlob() {
+      var blobStream = blob.get(ctx, 0, t.error);
+
+      streamToArray(blobStream, function(err, gotData) {
+        if (err) {
+          return end(err);
+        }
+
+        t.deepEquals(gotData, [data], 'blob has correct data');
+        end();
+      });
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});