mojo/syncbase: Make RunInBatch work for readonly batches.
Also fix missing awaits in retry loop.
Change-Id: I69e55c07fca7190143ab1b68e9f870ff6a06bc77
diff --git a/lib/src/nosql/batch.dart b/lib/src/nosql/batch.dart
index a3ebe6f..e295fab 100644
--- a/lib/src/nosql/batch.dart
+++ b/lib/src/nosql/batch.dart
@@ -20,23 +20,29 @@
}
}
+typedef Future RunInBatchFunction(SyncbaseBatchDatabase b);
+
Future runInBatch(
- SyncbaseDatabase db, mojom.BatchOptions bo, Function fn) async {
+ SyncbaseDatabase db, mojom.BatchOptions bo, RunInBatchFunction fn) async {
Future attempt() async {
var b = await db.beginBatch(bo);
try {
- fn(b);
+ await fn(b);
} catch (err) {
try {
await b.abort();
} catch (ignoredErr) {}
throw err; // throw fn error, not abort error
}
- // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
- // failure or ErrConcurrentTransaction. Depending on the cause of
- // failure, it may be desirable to retry the commit() and/or to call
- // abort().
- await b.commit();
+ if (bo.readOnly) {
+ await b.abort();
+ } else {
+ // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
+ // failure or ErrConcurrentTransaction. Depending on the cause of
+ // failure, it may be desirable to retry the commit() and/or to call
+ // abort().
+ await b.commit();
+ }
}
Future retryLoop(int i) async {
@@ -44,13 +50,13 @@
await attempt();
} catch (err) {
// TODO(sadovsky): Only retry if err is ErrConcurrentTransaction.
- if (err && i < 2) {
- retryLoop(i + 1);
+ if (i < 2) {
+ await retryLoop(i + 1);
} else {
throw err;
}
}
}
- retryLoop(0);
+ await retryLoop(0);
}
diff --git a/test/integration/syncbase_batch_test.dart b/test/integration/syncbase_batch_test.dart
index c806a6a..98dfba5 100644
--- a/test/integration/syncbase_batch_test.dart
+++ b/test/integration/syncbase_batch_test.dart
@@ -5,6 +5,7 @@
library syncbase_batch_test;
import 'dart:async';
+import 'dart:convert' show UTF8;
import 'package:test/test.dart';
@@ -71,4 +72,57 @@
expect(batch2.commit(), throws);
expect(await utils.scan(tb, ''), equals(batch1Kvs));
});
+
+ test('runInBatch', () async {
+ var d = await setupDatabase(c);
+ var tableName = 'tb';
+ var tb = d.table(tableName);
+ await tb.create(utils.emptyPerms());
+
+ var retries = 0;
+ await runInBatch(d, SyncbaseClient.batchOptions(),
+ (SyncbaseBatchDatabase b) async {
+ ++retries;
+ // Read foo. It does not exist.
+ expect(b.table(tableName).get("foo-${retries}"), throws);
+ // If we need to fail, write to foo in a separate concurrent batch. This
+ // is always written on every attempt.
+ if (retries < 2) {
+ await d.table(tableName).put("foo-${retries}", UTF8.encode('foo'));
+ }
+ // Write to bar.
+ await b.table(tableName).put("bar-${retries}", UTF8.encode('bar'));
+ });
+ // First try failed, second succeeded.
+ expect(retries, equals(2));
+ expect(await utils.scan(tb, ''),
+ equals([new utils.KV('bar-2', 'bar'), new utils.KV('foo-1', 'foo')]));
+ });
+
+ test('runInBatchReadOnly', () async {
+ var d = await setupDatabase(c);
+ var tableName = 'tb';
+ var tb = d.table(tableName);
+ await tb.create(utils.emptyPerms());
+
+ await d.table(tableName).put("foo", UTF8.encode('foo'));
+ var retries = 0;
+ await runInBatch(d, SyncbaseClient.batchOptions(readOnly: true),
+ (SyncbaseBatchDatabase b) async {
+ ++retries;
+ // Read foo.
+ var before = await b.table(tableName).get("foo");
+ // Write to foo in a separate concurrent batch. This is always written
+ // on every iteration. It should not cause a retry since readonly
+ // batches are not committed.
+ await d.table(tableName).put("foo", UTF8.encode('oof'));
+ // Read foo again. Batch should not see the changed value.
+ expect(await b.table(tableName).get("foo"), equals(before));
+ // Try writing to bar. This should fail since the batch is readonly.
+ expect(b.table(tableName).put("bar", UTF8.encode('bar')), throws);
+ });
+ // Single uncommitted iteration.
+ expect(retries, equals(1));
+ expect(await utils.scan(tb, ''), equals([new utils.KV('foo', 'oof')]));
+ });
}