Client support for Blobs.
Change-Id: I702dd561991fb3adfed1d2543672c96cbd3381cf
diff --git a/package.json b/package.json
index df391fb..4a86cd2 100644
--- a/package.json
+++ b/package.json
@@ -18,6 +18,7 @@
"mkdirp": "~0.5.1",
"prova": "aghassemi/prova#0.0.4",
"run-parallel": "~1.1.1",
+ "stream-array": "~1.1.0",
"stream-to-array": "~2.0.2",
"tap-xunit": "~1.1.1",
"which": "~1.1.1",
diff --git a/src/app.js b/src/app.js
index 0b4c536..ba39037 100644
--- a/src/app.js
+++ b/src/app.js
@@ -19,6 +19,10 @@
util.addNameProperties(this, parentFullName, relativeName);
+ // TODO(nlacasse): Use the prr module to simplify all the
+ // 'Object.defineProperty' calls scattered throughout the project.
+ // https://www.npmjs.com/package/prr
+
/**
* Caches the database wire object.
* @private
diff --git a/src/nosql/blob.js b/src/nosql/blob.js
new file mode 100644
index 0000000..c406165
--- /dev/null
+++ b/src/nosql/blob.js
@@ -0,0 +1,122 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = Blob;
+
+function Blob(db, blobRef) {
+ if (!(this instanceof Blob)) {
+ return new Blob(db, blobRef);
+ }
+
+ /**
+ * @private
+ */
+ Object.defineProperty(this, '_db', {
+ enumerable: false,
+ value: db,
+ writable: false
+ });
+
+ /**
+ * @property ref
+ * @type {string}
+ */
+ Object.defineProperty(this, 'ref', {
+ enumerable: true,
+ value: blobRef,
+ writable: false
+ });
+}
+
+/**
+ * Appends the byte stream to the blob.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ * @returns {Stream<Uint8Array>} Stream of bytes to append to blob.
+ */
+Blob.prototype.put = function(ctx, cb) {
+ return this._db._wire(ctx).putBlob(ctx, this.ref, cb).stream;
+};
+
+/**
+ * Marks the blob as immutable.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.commit = function(ctx, cb) {
+ this._db._wire(ctx).commitBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Gets the count of bytes written as part of the blob (committed or
+ * uncommitted).
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.size = function(ctx, cb) {
+ this._db._wire(ctx).getBlobSize(ctx, this.ref, cb);
+};
+
+/**
+ * Locally deletes the blob (committed or uncommitted).
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.delete = function(ctx, cb) {
+ this._db._wire(ctx).deleteBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Returns the byte stream from a committed blob starting at offset.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} offset Offset in bytes.
+ * @param {function} cb Callback.
+ * @returns {Stream<Uint8Array>} Stream of blob bytes.
+ */
+Blob.prototype.get = function(ctx, offset, cb) {
+ return this._db._wire(ctx).getBlob(ctx, this.ref, offset, cb).stream;
+};
+
+/**
+ * Initiates fetching a blob if not locally found, priority controls the
+ * network priority of the blob. Higher priority blobs are fetched before the
+ * lower priority ones. However an ongoing blob transfer is not interrupted.
+ * Status updates are streamed back to the client as fetch is in progress.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} priority Priority.
+ * @param {function} cb Callback.
+ * @returns {Stream<BlobFetchStatus>} Stream of blob statuses.
+ */
+Blob.prototype.fetch = function(ctx, priority, cb) {
+ return this._db._wire(ctx).fetchBlob(ctx, this.ref, priority, cb).stream;
+};
+
+/**
+ * Locally pins the blob so that it is not evicted.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.pin = function(ctx, cb) {
+ this._db._wire(ctx).pinBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Locally unpins the blob so that it can be evicted if needed.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.unpin = function(ctx, cb) {
+ this._db._wire(ctx).unpinBlob(ctx, this.ref, cb);
+};
+
+/**
+ * Locally caches the blob with the specified rank. Lower ranked blobs are
+ * more eagerly evicted.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {number} rank Rank of blob.
+ * @param {function} cb Callback.
+ */
+Blob.prototype.keep = function(ctx, rank, cb) {
+ this._db._wire(ctx).keepBlob(ctx, this.ref, rank, cb);
+};
diff --git a/src/nosql/database.js b/src/nosql/database.js
index a8fd7af..ce2c35f 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -15,6 +15,10 @@
var watchVdl = require('../gen-vdl/v.io/v23/services/watch');
var BatchDatabase = require('./batch-database');
+/* jshint -W079 */
+// Silence jshint's error about redefining 'Blob'.
+var Blob = require('./blob');
+/* jshint +W079 */
var SyncGroup = require('./syncgroup');
var Table = require('./table');
var util = require('../util');
@@ -487,3 +491,29 @@
Database.prototype._setSchemaMetadata = function(ctx, metadata, cb) {
return this._wire(ctx).setSchemaMetadata(ctx, metadata, cb);
};
+
+/**
+ * Returns a handle to the blob with the given blobRef.
+ * @param {module:syncbase.nosql.BlobRef} blobRef BlobRef of blob to get.
+ *
+ */
+Database.prototype.blob = function(blobRef) {
+ return new Blob(this, blobRef);
+};
+
+/**
+ * Creates a new blob.
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {function} cb Callback.
+ *
+ */
+Database.prototype.createBlob = function(ctx, cb) {
+ var self = this;
+ this._wire(ctx).createBlob(ctx, function(err, blobRef) {
+ if (err) {
+ return cb(err);
+ }
+ return cb(null, new Blob(self, blobRef));
+ });
+};
+
diff --git a/src/nosql/index.js b/src/nosql/index.js
index 5136d9f..469e1ed 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -17,6 +17,7 @@
*/
module.exports = {
BatchOptions: vdl.BatchOptions,
+ BlobRef: vdl.BlobRef,
ReadOnlyBatchError: vdl.ReadOnlyBatchError,
ResumeMarker: watch.ResumeMarker,
rowrange: rowrange,
diff --git a/test/integration/test-blob.js b/test/integration/test-blob.js
new file mode 100644
index 0000000..d1aee34
--- /dev/null
+++ b/test/integration/test-blob.js
@@ -0,0 +1,150 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var arrayToStream = require('stream-array');
+var streamToArray = require('stream-to-array');
+var test = require('prova');
+
+/* jshint -W079 */
+// Silence jshint's error about redefining 'Blob'.
+var Blob = require('../../src/nosql/blob');
+/* jshint +W079 */
+var vdl =
+ require('../../src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
+
+var testUtil = require('./util');
+var setupDatabase = testUtil.setupDatabase;
+var uniqueName = testUtil.uniqueName;
+
+test('db.blob returns the correct blob', function(t) {
+ setupDatabase(t, function(err, o) {
+ if (err) {
+ return t.end(err);
+ }
+
+ var blobRef = uniqueName('blobRef');
+ var blob = o.database.blob(blobRef);
+
+ t.ok(blob instanceof Blob);
+ t.equals(blob.ref, blobRef);
+
+ o.teardown(t.end);
+ });
+});
+
+// Tests local blob get before and after a put.
+test('blob put then get', function(t) {
+ setupDatabase(t, function(err, o) {
+ if (err) {
+ return t.end(err);
+ }
+
+ var ctx = o.ctx;
+ var db = o.database;
+
+ var blob;
+
+ var data = new Uint8Array(new ArrayBuffer(256));
+
+ db.createBlob(ctx, function(err, _blob) {
+ if (err) {
+ return end(err);
+ }
+
+ blob = _blob;
+ t.ok(blob instanceof Blob, 'createBlob returns a new blob');
+ t.equals(typeof blob.ref, 'string', 'blob has blobRef');
+
+ getEmptyBlob();
+ });
+
+ function getEmptyBlob() {
+ var blobStream = blob.get(ctx, 0, function(err) {
+ t.ok(err instanceof vdl.BlobNotCommittedError,
+ 'blob.get should fail for uncommitted blobs');
+ });
+
+ streamToArray(blobStream, function(err) {
+ t.ok(err instanceof vdl.BlobNotCommittedError,
+ 'blob.get should fail for uncommitted blobs');
+ fetchEmptyBlob();
+ });
+ }
+
+ function fetchEmptyBlob() {
+ var blobStatusStream = blob.fetch(ctx, 100, function(err) {
+ t.ok(err instanceof vdl.BlobNotCommittedError,
+ 'blob.fetch should fail for uncommitted blobs');
+ });
+
+ streamToArray(blobStatusStream, function(err) {
+ t.ok(err instanceof vdl.BlobNotCommittedError,
+ 'blob status stream should fail for uncommitted blobs');
+ assertBlobIsEmpty();
+ });
+ }
+
+ function assertBlobIsEmpty() {
+ blob.size(ctx, function(err, size) {
+ if (err) {
+ return end(err);
+ }
+ t.equals(size.toNativeNumber(), 0, 'blob is empty');
+
+ putToBlob();
+ });
+ }
+
+ function putToBlob() {
+ var byteStream = blob.put(ctx, function(err) {
+ if (err) {
+ return t.end(err);
+ }
+
+ assertBlobSize();
+ });
+
+ arrayToStream([data]).pipe(byteStream);
+ }
+
+ function assertBlobSize() {
+ blob.size(ctx, function(err, size) {
+ if (err) {
+ return end(err);
+ }
+ t.equals(size.toNativeNumber(), data.length, 'blob has correct size');
+
+ commitBlob();
+ });
+ }
+
+ function commitBlob() {
+ blob.commit(ctx, function(err) {
+ if (err) {
+ return end(err);
+ }
+
+ assertGetBlob();
+ });
+ }
+
+ function assertGetBlob() {
+ var blobStream = blob.get(ctx, 0, t.error);
+
+ streamToArray(blobStream, function(err, gotData) {
+ if (err) {
+ return end(err);
+ }
+
+ t.deepEquals(gotData, [data], 'blob has correct data');
+ end();
+ });
+ }
+
+ function end(err) {
+ t.error(err);
+ o.teardown(t.end);
+ }
+ });
+});