mojo/syncbase: Make RunInBatch work for readonly batches.

Also fix missing awaits in retry loop.

Change-Id: I69e55c07fca7190143ab1b68e9f870ff6a06bc77
diff --git a/lib/src/nosql/batch.dart b/lib/src/nosql/batch.dart
index a3ebe6f..e295fab 100644
--- a/lib/src/nosql/batch.dart
+++ b/lib/src/nosql/batch.dart
@@ -20,23 +20,29 @@
   }
 }
 
+typedef Future RunInBatchFunction(SyncbaseBatchDatabase b);
+
 Future runInBatch(
-    SyncbaseDatabase db, mojom.BatchOptions bo, Function fn) async {
+    SyncbaseDatabase db, mojom.BatchOptions bo, RunInBatchFunction fn) async {
   Future attempt() async {
     var b = await db.beginBatch(bo);
     try {
-      fn(b);
+      await fn(b);
     } catch (err) {
       try {
         await b.abort();
       } catch (ignoredErr) {}
       throw err; // throw fn error, not abort error
     }
-    // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
-    // failure or ErrConcurrentTransaction. Depending on the cause of
-    // failure, it may be desirable to retry the commit() and/or to call
-    // abort().
-    await b.commit();
+    if (bo.readOnly) {
+      await b.abort();
+    } else {
+      // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
+      // failure or ErrConcurrentTransaction. Depending on the cause of
+      // failure, it may be desirable to retry the commit() and/or to call
+      // abort().
+      await b.commit();
+    }
   }
 
   Future retryLoop(int i) async {
@@ -44,13 +50,13 @@
       await attempt();
     } catch (err) {
       // TODO(sadovsky): Only retry if err is ErrConcurrentTransaction.
-      if (err && i < 2) {
-        retryLoop(i + 1);
+      if (i < 2) {
+        await retryLoop(i + 1);
       } else {
         throw err;
       }
     }
   }
 
-  retryLoop(0);
+  await retryLoop(0);
 }
diff --git a/test/integration/syncbase_batch_test.dart b/test/integration/syncbase_batch_test.dart
index c806a6a..98dfba5 100644
--- a/test/integration/syncbase_batch_test.dart
+++ b/test/integration/syncbase_batch_test.dart
@@ -5,6 +5,7 @@
 library syncbase_batch_test;
 
 import 'dart:async';
+import 'dart:convert' show UTF8;
 
 import 'package:test/test.dart';
 
@@ -71,4 +72,57 @@
     expect(batch2.commit(), throws);
     expect(await utils.scan(tb, ''), equals(batch1Kvs));
   });
+
+  test('runInBatch', () async {
+    var d = await setupDatabase(c);
+    var tableName = 'tb';
+    var tb = d.table(tableName);
+    await tb.create(utils.emptyPerms());
+
+    var retries = 0;
+    await runInBatch(d, SyncbaseClient.batchOptions(),
+        (SyncbaseBatchDatabase b) async {
+      ++retries;
+      // Read foo. It does not exist.
+      expect(b.table(tableName).get("foo-${retries}"), throws);
+      // If we need to fail, write to foo in a separate concurrent batch. This
+      // is always written on every attempt.
+      if (retries < 2) {
+        await d.table(tableName).put("foo-${retries}", UTF8.encode('foo'));
+      }
+      // Write to bar.
+      await b.table(tableName).put("bar-${retries}", UTF8.encode('bar'));
+    });
+    // First try failed, second succeeded.
+    expect(retries, equals(2));
+    expect(await utils.scan(tb, ''),
+        equals([new utils.KV('bar-2', 'bar'), new utils.KV('foo-1', 'foo')]));
+  });
+
+  test('runInBatchReadOnly', () async {
+    var d = await setupDatabase(c);
+    var tableName = 'tb';
+    var tb = d.table(tableName);
+    await tb.create(utils.emptyPerms());
+
+    await d.table(tableName).put("foo", UTF8.encode('foo'));
+    var retries = 0;
+    await runInBatch(d, SyncbaseClient.batchOptions(readOnly: true),
+        (SyncbaseBatchDatabase b) async {
+      ++retries;
+      // Read foo.
+      var before = await b.table(tableName).get("foo");
+      // Write to foo in a separate concurrent batch. This is always written
+      // on every iteration. It should not cause a retry since readonly
+      // batches are not committed.
+      await d.table(tableName).put("foo", UTF8.encode('oof'));
+      // Read foo again. Batch should not see the changed value.
+      expect(await b.table(tableName).get("foo"), equals(before));
+      // Try writing to bar. This should fail since the batch is readonly.
+      expect(b.table(tableName).put("bar", UTF8.encode('bar')), throws);
+    });
+    // Single uncommitted iteration.
+    expect(retries, equals(1));
+    expect(await utils.scan(tb, ''), equals([new utils.KV('foo', 'oof')]));
+  });
 }