mojo/syncbase: Proper flow control in Dart.
Adding flow control between Dart streams <=> mojom <=> syncbase.
Previous CL (16134) added flow control to
syncbase's mojom API by implementing the observer + ack
pattern where clients have to ACK previous changes before
receiving more changes from the server.
This CL integrates that functionality with Dart streams
by ensuring that we do not ACK if
1) stream is paused.
2) stream has no listeners.
This means when the watch Dart stream is paused, we
push back all the way to syncbase server so client
never buffers and instead server buffers at the other
end of the pipe.
Change-Id: I4297d2c5e836bab626a51fa6f4dafedea91e2710
diff --git a/dart/bin/syncbase_example.dart b/dart/bin/syncbase_example.dart
index 03bde77..69e3eee 100644
--- a/dart/bin/syncbase_example.dart
+++ b/dart/bin/syncbase_example.dart
@@ -55,7 +55,8 @@
try {
var s = db.watch(table.name, '', await db.getResumeMarker());
s.forEach((change) {
- print('GOT CHANGE: ${change.rowName} - ${UTF8.decode(change.valueBytes)} - ${change.fromSync}');
+ print(
+ 'GOT CHANGE: ${change.rowName} - ${UTF8.decode(change.valueBytes)} - ${change.fromSync}');
});
} catch (e) {
print('ERROR in startWatch()');
@@ -78,7 +79,7 @@
print(e);
}
- await new Future.delayed(new Duration(seconds:5));
+ await new Future.delayed(new Duration(seconds: 5));
startPuts(table);
}
@@ -98,7 +99,8 @@
return app;
}
-Future<sb.SyncbaseNoSqlDatabase> createDb(sb.SyncbaseApp app, String name) async {
+Future<sb.SyncbaseNoSqlDatabase> createDb(
+ sb.SyncbaseApp app, String name) async {
var db = app.noSqlDatabase(name);
var exists = await db.exists();
if (exists) {
@@ -110,7 +112,8 @@
return db;
}
-Future<sb.SyncbaseTable> createTable(sb.SyncbaseNoSqlDatabase db, String name) async {
+Future<sb.SyncbaseTable> createTable(
+ sb.SyncbaseNoSqlDatabase db, String name) async {
var table = db.table(name);
var exists = await table.exists();
if (exists) {
@@ -122,7 +125,8 @@
return table;
}
-Future<sb.SyncbaseSyncGroup> joinOrCreateSyncGroup(sb.SyncbaseNoSqlDatabase db, String mtAddr, String tableName, String name) async {
+Future<sb.SyncbaseSyncGroup> joinOrCreateSyncGroup(sb.SyncbaseNoSqlDatabase db,
+ String mtAddr, String tableName, String name) async {
// TODO(nlacasse): Get your email address out of here! Figure out a way to
// get the mounttable name and path to this part of the code.
var mtName = naming.join(mtAddr, 'users/nlacasse@google.com');
@@ -144,12 +148,9 @@
print('syncgroup does not exist, creating it');
var sgSpec = sb.SyncbaseClient.syncGroupSpec(
- description: 'test sync group',
- perms: openPerms,
- // Sync the entire table.
- prefixes: ['$tableName:'],
- mountTables: [mtName]
- );
+ description: 'test sync group', perms: openPerms,
+ // Sync the entire table.
+ prefixes: ['$tableName:'], mountTables: [mtName]);
print('SGSPEC = $sgSpec');
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index f45727c..3a690a6 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -137,20 +137,32 @@
}
}
-class WatchGlobStreamImpl implements mojom.WatchGlobStream {
+class WatchGlobStreamImpl extends Object with StreamFlowControl
+ implements mojom.WatchGlobStream {
final StreamController<mojom.WatchChange> sc;
- WatchGlobStreamImpl._fromStreamController(this.sc);
+
+ WatchGlobStreamImpl._fromStreamController(this.sc) {
+ setupFlowControl(this.sc);
+ }
Future<mojom.WatchGlobStreamOnChangeResponseParams> onChange(
mojom.WatchChange change, Function resultFactory) {
+
+ // Testing instrumentations for testing flow control.
+ if (testing.isTesting) {
+ testing.DatabaseWatch.onChangeCounter.increment();
+ }
+
sc.add(change);
- // TODO(aghassemi): Honor the pause state.
- // If stream is paused, return a future that will be completed when stream
- // is resumed. Otherwise we are breaking Dart stream's flow control.
+ // Only ack after we get unlocked.
+ // If we are not locked, onNextUnlock completes immediately.
+ var ack = onNextUnlock().then((_) {
- // Send an ack back to server.
- return new Future.value(resultFactory(true));
+ // Send an ack back to server.
+ return new Future.value(resultFactory(true));
+ });
+ return ack;
}
// Called by the mojo proxy when the Go function call returns.
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index bb503cb..83a1903 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -97,20 +97,26 @@
}
}
-class ScanStreamImpl implements mojom.ScanStream {
+class ScanStreamImpl extends Object with StreamFlowControl
+ implements mojom.ScanStream {
final StreamController<mojom.KeyValue> sc;
- ScanStreamImpl._fromStreamController(this.sc);
+
+ ScanStreamImpl._fromStreamController(this.sc) {
+ setupFlowControl(this.sc);
+ }
Future<mojom.ScanStreamOnChangeResponseParams> onKeyValue(
mojom.KeyValue keyValue, Function resultFactory) {
sc.add(keyValue);
- // TODO(aghassemi): Honor the pause state.
- // If stream is paused, return a future that will be completed when stream
- // is resumed. Otherwise we are breaking Dart stream's flow control.
+ // Only ack after we get unlocked.
+ // If we are not locked, onNextUnlock completes immediately.
+ var ack = onNextUnlock().then((_) {
- // Send an ack back to server.
- return new Future.value(resultFactory(true));
+ // Send an ack back to server.
+ return new Future.value(resultFactory(true));
+ });
+ return ack;
}
// Called by the mojom proxy when the Go function call returns.
diff --git a/dart/lib/src/stream_flow_control.dart b/dart/lib/src/stream_flow_control.dart
new file mode 100644
index 0000000..e62d3b6
--- /dev/null
+++ b/dart/lib/src/stream_flow_control.dart
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+part of syncbase_client;
+
+// StreamFlowControl is a mixin that exposes methods setupFlowControl() and
+// onNextUnlock() to allow mixers add flow control to their stream controllers.
+class StreamFlowControl {
+ // We are in "locked" state if and only if _mutex is not null.
+ // We are locked by default until we gain the first subscriber.
+ Future _mutex = new Completer();
+
+ // Setup flow control by adding listeners to the stream controller.
+ // Mixin classes can't have constructors, so this is just a method that should
+ // be called from mixer's constructor.
+ setupFlowControl(StreamController sc) {
+ // Unlock when gaining the first subscriber.
+ sc.onListen = _unlock;
+ // Lock when losing the last subscriber.
+ sc.onCancel = _lock;
+ // Lock when paused.
+ sc.onPause = _lock;
+ // Unlock when resumed.
+ sc.onResume = _unlock;
+ }
+
+ // Returns a future that either completes immediately if we are not locked or
+ // if we are locked, it gets completed as soon as we get unlocked.
+ // Mixers can use this method to decide when to ack, telling the server to
+ // continue sending events.
+ Future onNextUnlock() {
+ if (_mutex == null) {
+ // We are not locked, return a completed future.
+ return new Future.value();
+ }
+ return _mutex.future;
+ }
+
+ // Locks the stream controller.
+ // When locked, server does not sent us change events anymore until we unlock.
+ // This happens because we don't send back an ack to the server when locked.
+ _lock() {
+ if (_mutex == null) {
+ _mutex = new Completer();
+ }
+ }
+
+ // Unlcoks the stream controller.
+ // When unlocked, server can send up more change events.
+ // This happens because we send back an ack to the server when unlocked after
+ // every change we receive instructing the server to send more.
+ _unlock() {
+ if (_mutex != null) {
+ _mutex.complete();
+ _mutex = null;
+ }
+ }
+}
diff --git a/dart/lib/src/testing_instrumentations.dart b/dart/lib/src/testing_instrumentations.dart
new file mode 100644
index 0000000..00a1c2e
--- /dev/null
+++ b/dart/lib/src/testing_instrumentations.dart
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This library provides some globals that can be accessed from tests and
+// implementation code so they can communicate certain private states to each
+// other enabling otherwise hard-to-do end-to-end tests.
+// Although mocking is a preferred approach, to allow mocking of private classes
+// deep in the stack, we would need to implement a full dependency injection
+// solution since Dart does not allow mocking libraries and files merely based
+// on import path.
+library testing_instrumentations;
+
+// To be set to true by test runner, any implementation code that needs to
+// instrument for testing, must check this first.
+bool isTesting = false;
+
+// Testing instrumentations for Database Watch implementation.
+abstract class DatabaseWatch {
+ // Allows Database Watch implementation to communicate how many changes
+ // it has received from the server.
+ static _Counter onChangeCounter = new Counter._internal();
+}
+
+// A generic counter.
+class Counter {
+ num count = 0;
+ Counter._internal();
+
+ increment() {
+ count++;
+ }
+ reset() {
+ count = 0;
+ }
+}
diff --git a/dart/lib/syncbase_client.dart b/dart/lib/syncbase_client.dart
index 39ec4f5..f81b4cd 100644
--- a/dart/lib/syncbase_client.dart
+++ b/dart/lib/syncbase_client.dart
@@ -9,6 +9,7 @@
import 'package:mojo/bindings.dart' as bindings;
import 'package:ether/src/naming/util.dart' as naming;
+import 'package:ether/src/testing_instrumentations.dart' as testing;
import 'gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart' as mojom;
@@ -18,6 +19,7 @@
part 'src/app.dart';
part 'src/named_resource.dart';
+part 'src/stream_flow_control.dart';
part 'src/util.dart';
part 'src/nosql/database.dart';
part 'src/nosql/row.dart';
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index e3dd55e..cd00f45 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -4,12 +4,14 @@
library syncbase_database_test;
+import 'dart:async';
import 'dart:convert' show UTF8;
import 'package:test/test.dart';
+import 'package:ether/src/testing_instrumentations.dart' as testing;
import 'package:ether/syncbase_client.dart'
- show SyncbaseClient, WatchChangeTypes, WatchChange;
+ show SyncbaseClient, WatchChangeTypes, WatchChange, WatchGlobStreamImpl;
import './utils.dart' as utils;
@@ -90,5 +92,63 @@
}
});
+ test('watch flow control', () async {
+ var app = c.app(utils.uniqueName('app'));
+ await app.create(utils.emptyPerms());
+ var db = app.noSqlDatabase(utils.uniqueName('db'));
+ await db.create(utils.emptyPerms());
+ var table = db.table(utils.uniqueName('table'));
+ await table.create(utils.emptyPerms());
+
+ var resumeMarker = await db.getResumeMarker();
+ var aFewMoments = new Duration(seconds: 1);
+ const int numOperations = 10;
+ var allOperations = [];
+
+ // Do several put operations in parallel and wait until they are all done.
+ for (var i = 0; i < numOperations; i++) {
+ allOperations.add(table.put('row $i', UTF8.encode('value$i')));
+ }
+ await Future.wait(allOperations);
+
+ // Reset testing instrumentations.
+ testing.DatabaseWatch.onChangeCounter.reset();
+
+ // Create a watch stream.
+ var watchStream = db.watch(table.name, '', resumeMarker);
+
+ // Listen for the data on the stream.
+ var allExpectedChangesReceived = new Completer();
+ Function onData(_) {
+ if (testing.DatabaseWatch.onChangeCounter.count == numOperations) {
+ allExpectedChangesReceived.complete();
+ }
+ }
+ var streamListener = watchStream.listen(onData);
+
+ // Pause the stream.
+ streamListener.pause();
+
+ // Wait a few moments.
+ await new Future.delayed(aFewMoments);
+
+ // Assert that we did not got any* events from server when paused.
+ // testing.DatabaseWatch.onChangeCounter instrumentation is used to ensure
+ // that client did not receive any updates from mojo server, guaranteeing
+ // flow control propagated properly all the way to the other end of the pipe
+ // *Note: We always get 1 change before we can tell the server to block by
+ // not acking that single change.
+ expect(testing.DatabaseWatch.onChangeCounter.count, equals(1));
+
+ // Resume the stream.
+ streamListener.resume();
+
+ // Wait until we get all expected changes.
+ await allExpectedChangesReceived.future;
+
+ // Assert we've got all the expected changes after resuming.
+ expect(testing.DatabaseWatch.onChangeCounter.count, equals(numOperations));
+ });
+
// TODO(nlacasse): Test database.get/setPermissions.
}
diff --git a/dart/test/integration/syncbase_test.dart b/dart/test/integration/syncbase_test.dart
index e423cb8..b8a9002 100755
--- a/dart/test/integration/syncbase_test.dart
+++ b/dart/test/integration/syncbase_test.dart
@@ -8,6 +8,7 @@
import 'package:mojo/core.dart' show MojoHandle;
import 'package:test/test.dart';
+import 'package:ether/src/testing_instrumentations.dart' as testing;
import 'package:ether/initialized_application.dart' show InitializedApplication;
import 'package:ether/syncbase_client.dart' show SyncbaseClient;
@@ -19,6 +20,9 @@
import './syncbase_table_test.dart' show runTableTests;
main(List args) async {
+ // Enable testing instrumentations.
+ testing.isTesting = true;
+
InitializedApplication app = new InitializedApplication.fromHandle(args[0]);
await app.initialized;