syncbase/dart: Addressing some TODOs, fixing type issues and
addressing comments from past review.
MultiPart: 1/2
Change-Id: I86a69c7aa2707d320e057ad9212b8d546837f46a
diff --git a/dart/bin/syncbase_example.dart b/dart/bin/syncbase_example.dart
index 69e3eee..bd4aeb0 100644
--- a/dart/bin/syncbase_example.dart
+++ b/dart/bin/syncbase_example.dart
@@ -148,9 +148,11 @@
print('syncgroup does not exist, creating it');
var sgSpec = sb.SyncbaseClient.syncGroupSpec(
- description: 'test sync group', perms: openPerms,
+ description: 'test sync group',
+ perms: openPerms,
// Sync the entire table.
- prefixes: ['$tableName:'], mountTables: [mtName]);
+ prefixes: ['$tableName:'],
+ mountTables: [mtName]);
print('SGSPEC = $sgSpec');
diff --git a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
index 47fa10e..62ae5b9 100644
--- a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
+++ b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
@@ -1055,9 +1055,8 @@
class ScanStreamOnKeyValueResponseParams extends bindings.Struct {
static const List<bindings.StructDataHeader> kVersions = const [
- const bindings.StructDataHeader(16, 0)
+ const bindings.StructDataHeader(8, 0)
];
- bool ack = false;
ScanStreamOnKeyValueResponseParams() : super(kVersions.last.size);
@@ -1092,27 +1091,19 @@
'Message newer than the last known version cannot be shorter than '
'required by the last known version.');
}
- if (mainDataHeader.version >= 0) {
-
- result.ack = decoder0.decodeBool(8, 0);
- }
return result;
}
void encode(bindings.Encoder encoder) {
- var encoder0 = encoder.getStructEncoderAtOffset(kVersions.last);
-
- encoder0.encodeBool(ack, 8, 0);
+ encoder.getStructEncoderAtOffset(kVersions.last);
}
String toString() {
- return "ScanStreamOnKeyValueResponseParams("
- "ack: $ack" ")";
+ return "ScanStreamOnKeyValueResponseParams("")";
}
Map toJson() {
Map map = new Map();
- map["ack"] = ack;
return map;
}
}
@@ -1252,9 +1243,8 @@
class WatchGlobStreamOnChangeResponseParams extends bindings.Struct {
static const List<bindings.StructDataHeader> kVersions = const [
- const bindings.StructDataHeader(16, 0)
+ const bindings.StructDataHeader(8, 0)
];
- bool ack = false;
WatchGlobStreamOnChangeResponseParams() : super(kVersions.last.size);
@@ -1289,27 +1279,19 @@
'Message newer than the last known version cannot be shorter than '
'required by the last known version.');
}
- if (mainDataHeader.version >= 0) {
-
- result.ack = decoder0.decodeBool(8, 0);
- }
return result;
}
void encode(bindings.Encoder encoder) {
- var encoder0 = encoder.getStructEncoderAtOffset(kVersions.last);
-
- encoder0.encodeBool(ack, 8, 0);
+ encoder.getStructEncoderAtOffset(kVersions.last);
}
String toString() {
- return "WatchGlobStreamOnChangeResponseParams("
- "ack: $ack" ")";
+ return "WatchGlobStreamOnChangeResponseParams("")";
}
Map toJson() {
Map map = new Map();
- map["ack"] = ack;
return map;
}
}
@@ -7687,9 +7669,8 @@
static const String name = ScanStreamName;
- ScanStreamOnKeyValueResponseParams _ScanStreamOnKeyValueResponseParamsFactory(bool ack) {
+ ScanStreamOnKeyValueResponseParams _ScanStreamOnKeyValueResponseParamsFactory() {
var result = new ScanStreamOnKeyValueResponseParams();
- result.ack = ack;
return result;
}
@@ -7891,9 +7872,8 @@
static const String name = WatchGlobStreamName;
- WatchGlobStreamOnChangeResponseParams _WatchGlobStreamOnChangeResponseParamsFactory(bool ack) {
+ WatchGlobStreamOnChangeResponseParams _WatchGlobStreamOnChangeResponseParamsFactory() {
var result = new WatchGlobStreamOnChangeResponseParams();
- result.ack = ack;
return result;
}
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index 3a690a6..6fa42bc 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -9,7 +9,7 @@
SyncbaseNoSqlDatabase._internal(
_proxy, _parentFullName, relativeName, batchSuffix)
: super._internal(_proxy, _parentFullName, relativeName,
- naming.join(_parentFullName, escape(relativeName) + batchSuffix));
+ naming.join(_parentFullName, escape(relativeName) + batchSuffix));
// table returns a table with the given relativeName.
SyncbaseTable table(String relativeName) {
@@ -137,18 +137,22 @@
}
}
-class WatchGlobStreamImpl extends Object with StreamFlowControl
+class WatchGlobStreamImpl extends Object
+ with StreamFlowControl
implements mojom.WatchGlobStream {
final StreamController<mojom.WatchChange> sc;
WatchGlobStreamImpl._fromStreamController(this.sc) {
- setupFlowControl(this.sc);
+ initFlowControl(this.sc);
}
- Future<mojom.WatchGlobStreamOnChangeResponseParams> onChange(
- mojom.WatchChange change, Function resultFactory) {
-
- // Testing instrumentations for testing flow control.
+ Future onChange(mojom.WatchChange change, [Function newAck = null]) {
+ // NOTE(aghassemi): We need to make newAck optional to match mojo's
+ // define class, but newAck is always provided by mojo when called.
+ if (newAck == null) {
+ throw new ArgumentError('newAck can not be null');
+ }
+ // Testing instrumentation for testing flow control.
if (testing.isTesting) {
testing.DatabaseWatch.onChangeCounter.increment();
}
@@ -156,13 +160,8 @@
sc.add(change);
// Only ack after we get unlocked.
- // If we are not locked, onNextUnlock completes immediately.
- var ack = onNextUnlock().then((_) {
-
- // Send an ack back to server.
- return new Future.value(resultFactory(true));
- });
- return ack;
+ // If we are not locked, onNextUnlock returns immediately.
+ return onNextUnlock().then((_) => newAck());
}
// Called by the mojo proxy when the Go function call returns.
diff --git a/dart/lib/src/nosql/row.dart b/dart/lib/src/nosql/row.dart
index 5317709..293fe5a 100644
--- a/dart/lib/src/nosql/row.dart
+++ b/dart/lib/src/nosql/row.dart
@@ -9,7 +9,7 @@
SyncbaseRow._internal(_proxy, _parentFullName, key)
: super._internal(_proxy, _parentFullName, key,
- naming.join(_parentFullName, escape(key))),
+ naming.join(_parentFullName, escape(key))),
this.key = key;
Future<bool> exists() async {
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index 83a1903..c3f0e92 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -7,7 +7,7 @@
class SyncbaseTable extends NamedResource {
SyncbaseTable._internal(_proxy, _parentFullName, relativeName)
: super._internal(_proxy, _parentFullName, relativeName,
- naming.join(_parentFullName, escape(relativeName)));
+ naming.join(_parentFullName, escape(relativeName)));
// row returns a row with the given key.
SyncbaseRow row(String key) {
@@ -97,26 +97,27 @@
}
}
-class ScanStreamImpl extends Object with StreamFlowControl
+class ScanStreamImpl extends Object
+ with StreamFlowControl
implements mojom.ScanStream {
final StreamController<mojom.KeyValue> sc;
ScanStreamImpl._fromStreamController(this.sc) {
- setupFlowControl(this.sc);
+ initFlowControl(this.sc);
}
- Future<mojom.ScanStreamOnChangeResponseParams> onKeyValue(
- mojom.KeyValue keyValue, Function resultFactory) {
+ Future onKeyValue(mojom.KeyValue keyValue, [Function newAck = null]) {
+ // NOTE(aghassemi): We need to make newAck optional to match mojo's
+ // define class, but newAck is always provided by mojo when called.
+ if (newAck == null) {
+ throw new ArgumentError('newAck can not be null');
+ }
+
sc.add(keyValue);
// Only ack after we get unlocked.
- // If we are not locked, onNextUnlock completes immediately.
- var ack = onNextUnlock().then((_) {
-
- // Send an ack back to server.
- return new Future.value(resultFactory(true));
- });
- return ack;
+ // If we are not locked, onNextUnlock returns immediately.
+ return onNextUnlock().then((_) => newAck());
}
// Called by the mojom proxy when the Go function call returns.
diff --git a/dart/lib/src/stream_flow_control.dart b/dart/lib/src/stream_flow_control.dart
index e62d3b6..342c2fb 100644
--- a/dart/lib/src/stream_flow_control.dart
+++ b/dart/lib/src/stream_flow_control.dart
@@ -4,17 +4,17 @@
part of syncbase_client;
-// StreamFlowControl is a mixin that exposes methods setupFlowControl() and
+// StreamFlowControl is a mixin that exposes methods initFlowControl() and
// onNextUnlock() to allow mixers add flow control to their stream controllers.
class StreamFlowControl {
// We are in "locked" state if and only if _mutex is not null.
// We are locked by default until we gain the first subscriber.
- Future _mutex = new Completer();
+ Completer _mutex = new Completer();
// Setup flow control by adding listeners to the stream controller.
// Mixin classes can't have constructors, so this is just a method that should
// be called from mixer's constructor.
- setupFlowControl(StreamController sc) {
+ initFlowControl(StreamController sc) {
// Unlock when gaining the first subscriber.
sc.onListen = _unlock;
// Lock when losing the last subscriber.
diff --git a/dart/lib/src/testing_instrumentation.dart b/dart/lib/src/testing_instrumentation.dart
new file mode 100644
index 0000000..a9ce2e4
--- /dev/null
+++ b/dart/lib/src/testing_instrumentation.dart
@@ -0,0 +1,33 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This library facilitates communication between implementation and test code.
+// This avoids the need for a complex dependency injection setup just to test
+// specific bits of implementation.
+library testing_instrumentation;
+
+// Set to true by test runner; implementation should check this variable before
+// touching other state in this module.
+bool isTesting = false;
+
+// Testing instrumentation for database watch implementation.
+abstract class DatabaseWatch {
+ // Allows database watch implementation to communicate how many changes
+ // it has received from the server.
+ static Counter onChangeCounter = new Counter._internal();
+}
+
+// A generic counter.
+class Counter {
+ num count = 0;
+ Counter._internal();
+
+ increment() {
+ count++;
+ }
+
+ reset() {
+ count = 0;
+ }
+}
diff --git a/dart/lib/src/testing_instrumentations.dart b/dart/lib/src/testing_instrumentations.dart
deleted file mode 100644
index 00a1c2e..0000000
--- a/dart/lib/src/testing_instrumentations.dart
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// This library provides some globals that can be accessed from tests and
-// implementation code so they can communicate certain private states to each
-// other enabling otherwise hard-to-do end-to-end tests.
-// Although mocking is a preferred approach, to allow mocking of private classes
-// deep in the stack, we would need to implement a full dependency injection
-// solution since Dart does not allow mocking libraries and files merely based
-// on import path.
-library testing_instrumentations;
-
-// To be set to true by test runner, any implementation code that needs to
-// instrument for testing, must check this first.
-bool isTesting = false;
-
-// Testing instrumentations for Database Watch implementation.
-abstract class DatabaseWatch {
- // Allows Database Watch implementation to communicate how many changes
- // it has received from the server.
- static _Counter onChangeCounter = new Counter._internal();
-}
-
-// A generic counter.
-class Counter {
- num count = 0;
- Counter._internal();
-
- increment() {
- count++;
- }
- reset() {
- count = 0;
- }
-}
diff --git a/dart/lib/syncbase_client.dart b/dart/lib/syncbase_client.dart
index f81b4cd..473e087 100644
--- a/dart/lib/syncbase_client.dart
+++ b/dart/lib/syncbase_client.dart
@@ -9,7 +9,7 @@
import 'package:mojo/bindings.dart' as bindings;
import 'package:ether/src/naming/util.dart' as naming;
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
import 'gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart' as mojom;
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index cd00f45..6afae8c 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -9,7 +9,7 @@
import 'package:test/test.dart';
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
import 'package:ether/syncbase_client.dart'
show SyncbaseClient, WatchChangeTypes, WatchChange, WatchGlobStreamImpl;
@@ -119,7 +119,7 @@
// Listen for the data on the stream.
var allExpectedChangesReceived = new Completer();
- Function onData(_) {
+ onData(_) {
if (testing.DatabaseWatch.onChangeCounter.count == numOperations) {
allExpectedChangesReceived.complete();
}
diff --git a/dart/test/integration/syncbase_test.dart b/dart/test/integration/syncbase_test.dart
index b8a9002..6941873 100755
--- a/dart/test/integration/syncbase_test.dart
+++ b/dart/test/integration/syncbase_test.dart
@@ -8,7 +8,7 @@
import 'package:mojo/core.dart' show MojoHandle;
import 'package:test/test.dart';
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
import 'package:ether/initialized_application.dart' show InitializedApplication;
import 'package:ether/syncbase_client.dart' show SyncbaseClient;
@@ -20,7 +20,7 @@
import './syncbase_table_test.dart' show runTableTests;
main(List args) async {
- // Enable testing instrumentations.
+ // Enable testing instrumentation.
testing.isTesting = true;
InitializedApplication app = new InitializedApplication.fromHandle(args[0]);
diff --git a/mojom/syncbase.mojom b/mojom/syncbase.mojom
index 5a7387a..b8e7eea 100644
--- a/mojom/syncbase.mojom
+++ b/mojom/syncbase.mojom
@@ -61,8 +61,7 @@
// an ack back from client before sending the next KeyValue, effectively
// creating a mechanism for flow control.
interface ScanStream {
- // TODO(aghassemi): Same TODO as WatchGlobStream.OnChange
- OnKeyValue(KeyValue key_value) => (bool ack);
+ OnKeyValue(KeyValue key_value) => ();
OnReturn(Error err);
};
@@ -86,12 +85,7 @@
// an ack back from client before sending the next change, effectively creating
// a mechanism for flow control.
interface WatchGlobStream {
- // TODO(aghassemi): We should be able to do => () instead of => (bool ack)
- // but mojom Go compiler has a bug where it does not differentiate between
- // no return and => () therefore call does not actually block until callback
- // is called. This is a bug and for now we get around it by returning a bool.
- // See https://github.com/vanadium/issues/issues/749 for details.
- OnChange(WatchChange change) => (bool ack);
+ OnChange(WatchChange change) => ();
OnError(Error err);
};