syncbase.js: Implement client batches.

Change-Id: I0d1a609d97abb49c1643f756f6f391374872d535
diff --git a/package.json b/package.json
index ff7c9f3..05622ef 100644
--- a/package.json
+++ b/package.json
@@ -9,15 +9,16 @@
     "through2": "~0.6.5"
   },
   "devDependencies": {
-    "jshint": "~2.7.0",
-    "prova": "aghassemi/prova#0.0.4",
-    "mkdirp": "~0.5.1",
-    "run-parallel": "~1.1.1",
-    "which": "~1.1.1",
-    "minimist": "~1.1.1",
-    "xtend": "~4.0.0",
     "async": "~1.0.0",
-    "stream-to-array": "~2.0.2"
+    "deep-equal": "~1.0.0",
+    "jshint": "~2.7.0",
+    "minimist": "~1.1.1",
+    "mkdirp": "~0.5.1",
+    "prova": "aghassemi/prova#0.0.4",
+    "run-parallel": "~1.1.1",
+    "stream-to-array": "~2.0.2",
+    "which": "~1.1.1",
+    "xtend": "~4.0.0"
   },
   "repository": {
     "type": "git",
diff --git a/src/nosql/batch-database.js b/src/nosql/batch-database.js
index 5f8c130..5f1b130 100644
--- a/src/nosql/batch-database.js
+++ b/src/nosql/batch-database.js
@@ -7,17 +7,24 @@
 /*
  * A handle to a set of reads and writes to the database that should be
  * considered an atomic unit. See beginBatch() for concurrency semantics.
+ *
+ * This constructor is private.  Use [database.beginBatch]{@link
+ * module:syncbase.nosql.Database.beginBatch} or [nosql.runInBatch]{@link
+ * module:syncbase.nosql~runInBatch} instead.
  * @constructor
- * @param {module:vanadium.syncbase.database.Database} db Database.
+ * @inner
+ * @param {module:syncbase.database.Database} db Database.
  */
 function BatchDatabase(db) {
   if (!(this instanceof BatchDatabase)) {
     return new BatchDatabase(db);
   }
 
-  this._db = db;
-
-  throw new Error('not implemented');
+  Object.defineProperty(this, '_db', {
+    enumerable: false,
+    value: db,
+    writeable: false
+  });
 }
 
 /**
@@ -25,8 +32,8 @@
  * @param {string} relativeName Table name.  Must not contain slashes.
  * @return {module:syncbase.table.Table} Table object.
  */
-BatchDatabase.prototype.table = function(ctx, relativeName, cb) {
-  return this._db.table(ctx, relativeName, cb);
+BatchDatabase.prototype.table = function(relativeName) {
+  return this._db.table(relativeName);
 };
 
 /**
@@ -35,7 +42,7 @@
  * @param {function} cb Callback.
  */
 BatchDatabase.prototype.listTables = function(ctx, cb) {
-  return this._db.listTables(ctx, cb);
+  this._db.listTables(ctx, cb);
 };
 
 /**
@@ -44,7 +51,7 @@
  * @param {function} cb Callback.
  */
 BatchDatabase.prototype.commit = function(ctx, cb) {
-  cb(new Error('not implemented'));
+  this._db._wire(ctx).commit(ctx, cb);
 };
 
 /**
@@ -55,5 +62,5 @@
  * @param {function} cb Callback.
  */
 BatchDatabase.prototype.abort = function(ctx, cb) {
-  cb(new Error('not implemented'));
+  this._db._wire(ctx).abort(ctx, cb);
 };
diff --git a/src/nosql/batch.js b/src/nosql/batch.js
index 49972eb..cd5fa36 100644
--- a/src/nosql/batch.js
+++ b/src/nosql/batch.js
@@ -4,4 +4,61 @@
 
 module.exports = runInBatch;
 
-function runInBatch(ctx, database, opts, runner) {}
\ No newline at end of file
+/**
+ * @summary
+ * runInBatch runs a function with a newly created batch. If the function
+ * errors, the batch is aborted. If the function succeeds, the batch is
+ * committed. If an error occurs during commit, then the batch is aborted.
+ *
+ * @param {module:vanadium.context.Context} ctx Vanadium context.
+ * @param {module:syncbase.database.Database} db Database.
+ * @param {module:vanadium.syncbase.nosql.BatchOptions} opts BatchOptions.
+ * @param {module:syncbase.nosql~runInBatchFn} fn Function to run inside a
+ * batch.
+ * @param {module:vanadium~voidCb} cb Callback that will be called after the
+ * batch has been committed or aborted.
+ *
+ * TODO(nlacasse): Add retry loop.
+ */
+function runInBatch(ctx, db, opts, fn, cb) {
+  db.beginBatch(ctx, opts, function(err, batchDb) {
+    if (err) {
+      return cb(err);
+    }
+
+    function onError(err) {
+      batchDb.abort(ctx, function() {
+        cb(err);
+      });
+    }
+
+    function onSuccess() {
+      // TODO(nlacasse): Commit() can fail for a number of reasons, e.g. RPC
+      // failure or ErrConcurrentTransaction. Depending on the cause of
+      // failure, it may be desirable to retry the Commit() and/or to call
+      // Abort(). For now, we always abort on a failed commit.
+      batchDb.commit(ctx, function(commitErr) {
+        if (commitErr) {
+          return onError(commitErr);
+        }
+        return cb(null);
+      });
+    }
+
+    fn(batchDb, function(err) {
+      if (err) {
+        return onError(err);
+      }
+      onSuccess();
+    });
+  });
+}
+
+/**
+ * A function that is run inside a batch by [runInBatch]{@link
+ * module:syncbase.nosql~runInBatch}.
+ * @callback module:syncbase.nosql~runInBatchFn
+ * @param {module:syncbase.batchDatabase.BatchDatabase} batch BatchDatabase.
+ * @param {object} service The stub object containing the exported
+ * methods of the remote service.
+ */
diff --git a/src/nosql/database.js b/src/nosql/database.js
index bdaec63..236eb77 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -6,6 +6,7 @@
 
 var vanadium = require('vanadium');
 
+var BatchDatabase = require('./batch-database');
 var nosqlVdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
 var Table = require('./table');
 var util = require('../util');
@@ -163,12 +164,6 @@
 };
 
 /**
- * Configuration options for Batches.
- * @constructor
- */
-Database.BatchOptions = nosqlVdl.BatchOptions;
-
-/**
  * Creates a new batch. Instead of calling this function directly, clients are
  * recommended to use the RunInBatch() helper function, which detects
  * "concurrent batch" errors and handles retries internally.
@@ -185,9 +180,21 @@
  *
  * Concurrency semantics can be configured using BatchOptions.
  * @param {module:vanadium.context.Context} ctx Vanadium context.
- * @param {module:vanadium.syncbase.Database.BatchOptions} opts BatchOptions.
+ * @param {module:vanadium.syncbase.nosql.BatchOptions} opts BatchOptions.
  * @param {function} cb Callback.
  */
 Database.prototype.beginBatch = function(ctx, opts, cb) {
-  cb(new Error('not implemented'));
+  var self = this;
+  this._wire(ctx).beginBatch(ctx, opts, function(err, relativeName) {
+    if (err) {
+      return cb(err);
+    }
+
+    // The relativeName returned from the beginBatch() call above is different
+    // than the relativeName of the current database. We must create a new
+    // Database with this new relativeName, and then create a BatchDatabase
+    // from that new Database.
+    var db = new Database(self._parentFullName, relativeName);
+    return cb(null, new BatchDatabase(db));
+  });
 };
diff --git a/src/nosql/index.js b/src/nosql/index.js
index 3b126cb..c0fe714 100644
--- a/src/nosql/index.js
+++ b/src/nosql/index.js
@@ -2,8 +2,9 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-var runInBatch = require('./batch');
 var rowrange = require('./rowrange');
+var runInBatch = require('./batch');
+var vdl = require('../gen-vdl/v.io/syncbase/v23/services/syncbase/nosql');
 
 /**
  * @summary
@@ -13,6 +14,8 @@
  * @memberof module:syncbase
  */
 module.exports = {
+  BatchOptions: vdl.BatchOptions,
+  ReadOnlyBatchError: vdl.ReadOnlyBatchError,
   rowrange: rowrange,
   runInBatch: runInBatch
-};
\ No newline at end of file
+};
diff --git a/src/nosql/rowrange.js b/src/nosql/rowrange.js
index c9bf049..98ed55a 100644
--- a/src/nosql/rowrange.js
+++ b/src/nosql/rowrange.js
@@ -126,4 +126,4 @@
 
   RowRange.call(this, startBytes, limitBytes);
 }
-inherits(PrefixRange, RowRange);
\ No newline at end of file
+inherits(PrefixRange, RowRange);
diff --git a/src/nosql/table.js b/src/nosql/table.js
index 9120179..fd44896 100644
--- a/src/nosql/table.js
+++ b/src/nosql/table.js
@@ -120,8 +120,14 @@
       cb(null, row);
     });
   });
+
   var stream = this._wire(ctx).scan(ctx, range.start, range.limit, cb).stream;
-  return stream.pipe(vomStreamDecoder);
+  var decodedStream = stream.pipe(vomStreamDecoder);
+  stream.on('error', function(err) {
+    decodedStream.emit('error', err);
+  });
+
+  return decodedStream;
 };
 
 /**
@@ -232,4 +238,4 @@
     writable: false,
     enumerable: true
   });
-}
\ No newline at end of file
+}
diff --git a/src/util.js b/src/util.js
index ed22bba..b17e96d 100644
--- a/src/util.js
+++ b/src/util.js
@@ -25,6 +25,17 @@
   var fullName = vanadium.naming.join(parentFullName, relativeName);
 
   /**
+   * @property _parentFullName
+   * @private
+   * @type {string}
+   */
+  Object.defineProperty(self, '_parentFullName', {
+    value: parentFullName,
+    writable: false,
+    enumerable: false
+  });
+
+  /**
    * @property name
    * @type {string}
    */
@@ -35,7 +46,7 @@
   });
 
   /**
-   * @property name
+   * @property fullName
    * @type {string}
    */
   Object.defineProperty(self, 'fullName', {
@@ -120,4 +131,4 @@
   }
 
   return bytes;
-}
\ No newline at end of file
+}
diff --git a/test/integration/test-batch.js b/test/integration/test-batch.js
new file mode 100644
index 0000000..576a3f6
--- /dev/null
+++ b/test/integration/test-batch.js
@@ -0,0 +1,481 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var async = require('async');
+var test = require('prova');
+
+var BatchDatabase = require('../../src/nosql/batch-database');
+
+var nosql = require('../..').nosql;
+var BatchOptions = nosql.BatchOptions;
+var range = nosql.rowrange;
+var ReadOnlyBatchError = nosql.ReadOnlyBatchError;
+
+var testUtil = require('./util');
+var assertScanRows = testUtil.assertScanRows;
+var setupDatabase = testUtil.setupDatabase;
+var setupTable = testUtil.setupTable;
+var uniqueName = testUtil.uniqueName;
+
+test('db.beginBatch creates a BatchDatabase with name', function(t) {
+  setupDatabase(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    o.database.beginBatch(o.ctx, new BatchOptions({}), function(err, batch) {
+      if (err) {
+        t.error(err);
+        return o.teardown(t.end);
+      }
+
+      t.ok(batch instanceof BatchDatabase, 'batch is a BatchDatabase');
+      t.notEqual(batch.name, o.database.name,
+                 'batch has different name than database');
+      t.notEqual(batch.fullName, o.database.fullName,
+                 'batch has different fullName than database');
+
+      o.teardown(t.end);
+    });
+  });
+});
+
+test('transactions are not visible until commit', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    var keyName = uniqueName('key');
+    var value = uniqueName('val');
+
+    var emptyPrefix = range.prefix('');
+
+    db.beginBatch(ctx, new BatchOptions({}), put);
+
+    var batch;
+    function put(err, _batch) {
+      if (err) {
+        t.error(err);
+        return o.teardown(t.end);
+      }
+
+      batch = _batch;
+      var batchTable = batch.table(table.name);
+      batchTable.put(ctx, keyName, value, assertNoRows);
+    }
+
+    function assertNoRows(err) {
+      if (err) {
+        return end(err);
+      }
+
+      assertScanRows(ctx, table, emptyPrefix, [], commit);
+    }
+
+    function commit(err) {
+      if (err) {
+        return end(err);
+      }
+
+      batch.commit(ctx, assertRow);
+    }
+
+    function assertRow(err) {
+      if (err) {
+        return end(err);
+      }
+
+      var wantRows = [{
+        key: keyName,
+        value: value
+      }];
+
+      assertScanRows(ctx, table, emptyPrefix, wantRows, end);
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});
+
+test('concurrent transactions are isolated', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    var batches;
+    var batchTables;
+    var rows;
+
+    var emptyPrefix = range.prefix('');
+
+    startBatches();
+
+    // Create two batches.
+    function startBatches() {
+      async.times(2, function(n, cb) {
+        db.beginBatch(ctx, {}, cb);
+      }, addRows);
+    }
+
+    // Each batch adds a new row.
+    function addRows(err, _batches) {
+      if (err) {
+        return end(err);
+      }
+
+      batches = _batches;
+      batchTables = batches.map(function(batch) {
+        return batch.table(table.name);
+      });
+
+      async.mapSeries(batchTables, function(batchTable, cb) {
+        var key = uniqueName('key');
+        var value = uniqueName('value');
+        batchTable.put(ctx, key, value, function(err) {
+          if (err) {
+            return cb(err);
+          }
+          return cb(null, [{key: key, value: value}]);
+        });
+      }, assertBatchesSeeCorrectRows);
+    }
+
+    // Verify that each batch sees only its own rows.
+    function assertBatchesSeeCorrectRows(err, _rows) {
+      if (err) {
+        return end(err);
+      }
+
+      rows = _rows;
+      async.forEachOfSeries(batchTables, function(batchTable, idx, cb) {
+        // NOTE(nlacasse): Currently, a scan() inside a batch will return only
+        // the rows that existed in the snapshot when the batch was started.
+        // Thus, we can't use assertScanRows() to check that the batch has the
+        // correct rows.  Instead we must call get() on the table directly to
+        // ensure that the new rows exist in the snapshot.
+        batchTable.get(ctx, rows[idx][0].key, function(err, row) {
+          if (err) {
+            return cb(err);
+          }
+          t.equal(rows[idx].key, row.key, 'row has correct key');
+          t.equal(rows[idx].value, row.value, 'row has correct value');
+          return cb(null);
+        });
+      }, commitFirstBatch);
+    }
+
+    function commitFirstBatch(err) {
+      if (err) {
+        return end(err);
+      }
+
+      batches[0].commit(ctx, commitSecondBatch);
+    }
+
+    function commitSecondBatch(err) {
+      if (err) {
+        return end(err);
+      }
+
+      // Second batch should fail on commit.
+      batches[1].commit(ctx, function(err) {
+        t.ok(err, 'second batch should fail on commit');
+
+        assertFirstBatchesRowsExist();
+      });
+    }
+
+    function assertFirstBatchesRowsExist() {
+      // Check that only first batch's rows exist in table.
+      assertScanRows(ctx, table, emptyPrefix, rows[0], end);
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});
+
+test('readonly batches', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    var key = uniqueName('key');
+    var value = uniqueName('value');
+
+    var batch;
+    var batchTable;
+
+    table.put(ctx, key, value, startReadOnlyBatch);
+
+    function startReadOnlyBatch(err) {
+      if (err) {
+        return end(err);
+      }
+
+      var opts = new BatchOptions(new Map([
+        ['ReadOnly', true]
+      ]));
+
+      db.beginBatch(ctx, opts, attemptBatchPut);
+    }
+
+    function attemptBatchPut(err, _batch) {
+      if (err) {
+        return end(err);
+      }
+
+      batch = _batch;
+      batchTable = batch.table(table.name);
+
+      batchTable.put(ctx, uniqueName('key'), uniqueName('val'), function(err) {
+        assertReadOnlyBatchError(err);
+        attemptBatchDeletePrefix();
+      });
+    }
+
+    function attemptBatchDeletePrefix() {
+      batchTable.delete(ctx, range.prefix(key), function(err) {
+        assertReadOnlyBatchError(err);
+        attemptBatchDeleteRow();
+      });
+    }
+
+    function attemptBatchDeleteRow() {
+      batchTable.row(key).delete(ctx, function(err) {
+        assertReadOnlyBatchError(err);
+        end();
+      });
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+
+    function assertReadOnlyBatchError(err) {
+      t.ok(err, 'should error');
+      t.ok(err instanceof ReadOnlyBatchError,
+           'err should be ReadOnlyBatchError');
+    }
+
+  });
+});
+
+test('new batch operations fail after successful batch commit', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return t.end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    db.beginBatch(ctx, {}, put);
+
+    var batch;
+
+    function put(err, _batch) {
+      if (err) {
+        return end(err);
+      }
+
+      batch = _batch;
+      var batchTable = batch.table(table.name);
+
+      batchTable.put(ctx, uniqueName('key'), uniqueName('val'), commit);
+    }
+
+    function commit(err) {
+      if (err) {
+        return end(err);
+      }
+
+      batch.commit(ctx, function(err) {
+        if (err) {
+          return end(err);
+        }
+
+        assertOpsFail(t, ctx, batch, table.name, end);
+      });
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});
+
+test('new batch operations fail after unsuccessful batch commit', function(t) {
+  setupTable(t, function(err, o) {
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    db.beginBatch(ctx, {}, putTable);
+
+    var key = uniqueName('key');
+    var value = uniqueName('value');
+
+    var batch;
+
+    function putTable(err, _batch) {
+      if (err) {
+        return end(err);
+      }
+
+      batch = _batch;
+
+      // Put on the table directly, not the batch table.  This will conflict
+      // with future batchTable.put() call.
+      table.put(ctx, key, value, putBatchTable);
+    }
+
+    function putBatchTable(err) {
+      if (err) {
+        return end(err);
+      }
+
+      var newValue = uniqueName('value');
+
+      var batchTable = batch.table(table.name);
+      batchTable.put(ctx, key, newValue, commit);
+    }
+
+    function commit(err) {
+      if (err) {
+        return end(err);
+      }
+
+      batch.commit(ctx, function(err) {
+        t.ok(err, 'commit() should error');
+        assertOpsFail(t, ctx, batch, table.name, end);
+      });
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});
+
+test('new batch operations fail after batch is aborted', function(t) {
+  setupTable(t, function(err, o) {
+    if (err) {
+      return end(err);
+    }
+
+    var ctx = o.ctx;
+    var db = o.database;
+    var table = o.table;
+
+    db.beginBatch(ctx, {}, abort);
+
+    function abort(err, batch) {
+      if (err) {
+        return end(err);
+      }
+
+      batch.abort(ctx, function(err) {
+        if (err) {
+          return end(err);
+        }
+
+        assertOpsFail(t, ctx, batch, table.name, end);
+      });
+    }
+
+    function end(err) {
+      t.error(err);
+      o.teardown(t.end);
+    }
+  });
+});
+
+function assertOpsFail(t, ctx, batch, tableName, cb) {
+  var batchTable = batch.table(tableName);
+
+  async.series([
+    assertGetFails,
+    assertScanFails,
+    assertPutFails,
+    assertDeleteFails,
+    assertRowDeleteFails,
+    assertCommitFails
+  ], cb);
+
+  function assertGetFails(cb) {
+    batchTable.get(ctx, uniqueName('key'), function(err) {
+      t.ok(err, 'get() should error');
+      cb(null);
+    });
+  }
+
+  function assertScanFails(cb) {
+    var streamGotError = false;
+
+    var stream = batchTable.scan(ctx, range.prefix(''), function(err) {
+      t.ok(err, 'scan() should pass error to callback');
+      t.ok(streamGotError, 'scan() should send error to stream');
+      cb(null);
+    });
+
+    stream.on('error', function(err) {
+      streamGotError = true;
+    });
+  }
+
+  function assertPutFails(cb) {
+    batchTable.put(ctx, uniqueName('key'), uniqueName('val'),
+                   function(err) {
+      t.ok(err, 'put() should error');
+      cb(null);
+    });
+  }
+
+  function assertDeleteFails(cb) {
+    batchTable.delete(ctx, range.prefix(uniqueName('key')), function(err) {
+      t.ok(err, 'delete() should error');
+      cb(null);
+    });
+  }
+
+  function assertRowDeleteFails(cb) {
+    batchTable.row(uniqueName('key')).delete(ctx, function(err) {
+      t.ok(err, 'row.delete() should error');
+      cb(null);
+    });
+  }
+
+  function assertCommitFails(cb) {
+    batch.commit(ctx, function(err) {
+      t.ok(err, 'commit() should error');
+      cb(null);
+    });
+  }
+}
diff --git a/test/integration/test-database.js b/test/integration/test-database.js
index 3509bcf..ad3b4c2 100644
--- a/test/integration/test-database.js
+++ b/test/integration/test-database.js
@@ -134,9 +134,7 @@
   });
 });
 
-//TODO(aghassemi) This does not seem to be the case anymore, did something in Go
-//change? (also we need CI and presubmit for these tests now to prevent this)
-test.skip('deleting a db that has not been created should error', function(t) {
+test('deleting a db that has not been created should not error', function(t) {
   setupApp(t, function(err, o) {
     if (err) {
       return t.end(err);
@@ -145,7 +143,7 @@
     var db = o.app.noSqlDatabase(uniqueName('db'));
 
     db.delete(o.ctx, function(err) {
-      t.ok(err, 'should error');
+      t.error(err);
       o.teardown(t.end);
     });
   });
@@ -226,9 +224,7 @@
   });
 });
 
-//TODO(aghassemi) This does not seem to be the case anymore, did something in Go
-//change? (also we need CI and presubmit for these tests now to prevent this)
-test.skip('deleting a table that does not exist should error', function(t) {
+test('deleting a table that does not exist should error', function(t) {
   setupDatabase(t, function(err, o) {
     if (err) {
       return t.end(err);
@@ -238,7 +234,7 @@
     var tableName = uniqueName('table');
 
     db.deleteTable(o.ctx, tableName, function(err) {
-      t.ok(err, 'should error.');
+      t.error(err);
       o.teardown(t.end);
     });
   });
@@ -255,4 +251,4 @@
       return o.teardown(t.end);
     });
   });
-});
\ No newline at end of file
+});
diff --git a/test/integration/test-run-in-batch.js b/test/integration/test-run-in-batch.js
new file mode 100644
index 0000000..84011f3
--- /dev/null
+++ b/test/integration/test-run-in-batch.js
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var test = require('prova');
+
+var runInBatch = require('../..').nosql.runInBatch;
+
+function MockBatchDb(failOnCommit) {
+  this.abortCalled = false;
+  this.commitCalled = false;
+  this._failOnCommit = failOnCommit;
+}
+
+MockBatchDb.prototype.abort = function(ctx, cb) {
+  this.abortCalled = true;
+  cb();
+};
+
+MockBatchDb.prototype.commit = function(ctx, cb) {
+  this.commitCalled = true;
+  if (this._failOnCommit) {
+    return cb(new Error('error committing'));
+  }
+  cb();
+};
+
+function MockDb(failOnCommit) {
+  this.batchDb = null;
+  this._failOnCommit = failOnCommit;
+}
+
+MockDb.prototype.beginBatch = function(ctx, opts, cb) {
+  this.batchDb = new MockBatchDb(this._failOnCommit);
+  return cb(null, this.batchDb);
+};
+
+test('runInBatch commits on success', function(t) {
+  var ctx = {};
+  var db = new MockDb();
+
+  function willSucceed(db, cb) {
+    cb(null);
+  }
+
+  runInBatch(ctx, db, {}, willSucceed, function(err) {
+    if (err) {
+      return t.end(err);
+    }
+
+    t.ok(db.batchDb, 'batch db is created');
+    t.ok(db.batchDb.commitCalled, 'batchDb.commit() was called');
+    t.notok(db.batchDb.abortCalled, 'batchDb.abort() was not called');
+
+    t.end();
+  });
+});
+
+test('runInBatch aborts on failure', function(t) {
+  var ctx = {};
+  var db = new MockDb();
+  var error = new Error('boom!');
+
+  function willFail(db, cb) {
+    cb(error);
+  }
+
+  runInBatch(ctx, db, {}, willFail, function(err) {
+    t.ok(err, 'runInBatch should return an error');
+    t.equal(err, error, 'runInBatch returns the correct error');
+
+    t.ok(db.batchDb, 'batch db is created');
+    t.notok(db.batchDb.commitCalled, 'batchDb.commit() was not called');
+    t.ok(db.batchDb.abortCalled, 'batchDb.abort() was called');
+
+    t.end();
+  });
+});
+
+test('runInBatch aborts if commit fails', function(t) {
+  var ctx = {};
+  var db = new MockDb(true);
+
+  function willSucceed(db, cb) {
+    cb(null);
+  }
+
+  runInBatch(ctx, db, {}, willSucceed, function(err) {
+    t.ok(err, 'runInBatch should return an error');
+
+    t.ok(db.batchDb, 'batch db is created');
+    t.ok(db.batchDb.commitCalled, 'batchDb.commit() was called');
+    t.ok(db.batchDb.abortCalled, 'batchDb.abort() was called');
+
+    t.end();
+  });
+});
diff --git a/test/integration/test-table.js b/test/integration/test-table.js
index dcb2a42..3714eba 100644
--- a/test/integration/test-table.js
+++ b/test/integration/test-table.js
@@ -3,12 +3,12 @@
 // license that can be found in the LICENSE file.
 
 var async = require('async');
-var streamToArray = require('stream-to-array');
 var test = require('prova');
 
-var testUtil = require('./util');
 var syncbase = require('../..');
 
+var testUtil = require('./util');
+var assertScanRows = testUtil.assertScanRows;
 var setupTable = testUtil.setupTable;
 var uniqueName = testUtil.uniqueName;
 
@@ -84,24 +84,13 @@
         return o.teardown(t.end);
       }
 
+      var wantRows = [{
+        key: key,
+        value: value
+      }];
       var range = syncbase.nosql.rowrange.singleRow(key);
-      var stream = table.scan(o.ctx, range, function(err) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-      });
-
-      streamToArray(stream, function(err, values) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-
-        var row = values[0];
-        t.ok(row, 'row exists');
-        t.deepEquals(row.key, key, 'got expected key from scan');
-        t.deepEquals(row.value, value, 'got expected value from scan');
+      assertScanRows(o.ctx, table, range, wantRows, function(err) {
+        t.error(err);
         o.teardown(t.end);
       });
     }
@@ -143,22 +132,8 @@
       }
 
       var range = syncbase.nosql.rowrange.prefix(ROW_KEY);
-      var stream = table.scan(o.ctx, range, function(err) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-      });
-
-      streamToArray(stream, function(err, rows) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-
-        t.ok(rows, 'got some rows');
-        t.deepEquals(rows.sort(), prefixedRows.sort(),
-          'got expected results from scan');
+      assertScanRows(o.ctx, table, range, prefixedRows, function(err) {
+        t.error(err);
         o.teardown(t.end);
       });
     }
@@ -202,20 +177,9 @@
         return o.teardown(t.end);
       }
 
-      var stream = table.scan(o.ctx, range, function(err) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-      });
-
-      streamToArray(stream, function(err, rows) {
-        if (err) {
-          t.error(err);
-          return o.teardown(t.end);
-        }
-
-        t.deepEquals(rows, [], 'Rows were deleted successfully');
+      var wantRows = [];
+      assertScanRows(o.ctx, table, range, wantRows, function(err) {
+        t.error(err);
         o.teardown(t.end);
       });
     }
@@ -326,4 +290,4 @@
       }
     }
   });
-});
\ No newline at end of file
+});
diff --git a/test/integration/util.js b/test/integration/util.js
index 545d8fe..6f8d934 100644
--- a/test/integration/util.js
+++ b/test/integration/util.js
@@ -12,13 +12,15 @@
   setupService: setupService,
   setupTable: setupTable,
 
-  uniqueName: uniqueName,
-
-  testGetSetPermissions: testGetSetPermissions
+  assertScanRows: assertScanRows,
+  testGetSetPermissions: testGetSetPermissions,
+  uniqueName: uniqueName
 };
 
-var vanadium = require('vanadium');
+var deepEqual = require('deep-equal');
 var extend = require('xtend');
+var streamToArray = require('stream-to-array');
+var vanadium = require('vanadium');
 
 var syncbase = require('../..');
 
@@ -209,4 +211,47 @@
 
     cb(null, names.indexOf(name) >= 0);
   });
-}
\ No newline at end of file
+}
+
+function compareRows(r1, r2) {
+  if (r1.key > r2.key) {
+    return 1;
+  }
+  if (r1.key < r2.key) {
+    return -1;
+  }
+  if (r1.value > r2.value) {
+    return 1;
+  }
+  if (r1.value < r2.value) {
+    return -1;
+  }
+  return 0;
+}
+
+function assertScanRows(ctx, table, range, wantRows, cb) {
+  var stream = table.scan(ctx, range, function(err) {
+    if (err) {
+      return cb(err);
+    }
+  });
+
+  streamToArray(stream, function(err, rows) {
+    if (err) {
+      return cb(err);
+    }
+
+    rows = rows || [];
+
+    rows.sort(compareRows);
+    wantRows.sort(compareRows);
+
+    if (!deepEqual(rows, wantRows)) {
+      var error = new Error('Expected rows to be ' + JSON.stringify(wantRows) +
+                        ' but got ' + JSON.stringify(rows));
+      return cb(error);
+    }
+
+    cb(null);
+  });
+}
diff --git a/test/start-syncbased.sh b/test/start-syncbased.sh
index cf54fb0..8fe21d6 100755
--- a/test/start-syncbased.sh
+++ b/test/start-syncbased.sh
@@ -10,4 +10,4 @@
 # fix service-runner to allow flags/arguments, and then have it start syncbased
 # directly with the appropriate flags.  Then we can delete this file.
 
-syncbased -v=3 --name test/syncbased
+syncbased -v=3 --name test/syncbased --engine memstore