syncbase/dart: Addressing some TODOs, fixing type issues and
addressing comments from past review.

MultiPart: 1/2
Change-Id: I86a69c7aa2707d320e057ad9212b8d546837f46a
diff --git a/dart/bin/syncbase_example.dart b/dart/bin/syncbase_example.dart
index 69e3eee..bd4aeb0 100644
--- a/dart/bin/syncbase_example.dart
+++ b/dart/bin/syncbase_example.dart
@@ -148,9 +148,11 @@
     print('syncgroup does not exist, creating it');
 
     var sgSpec = sb.SyncbaseClient.syncGroupSpec(
-        description: 'test sync group', perms: openPerms,
+        description: 'test sync group',
+        perms: openPerms,
         // Sync the entire table.
-        prefixes: ['$tableName:'], mountTables: [mtName]);
+        prefixes: ['$tableName:'],
+        mountTables: [mtName]);
 
     print('SGSPEC = $sgSpec');
 
diff --git a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
index 47fa10e..62ae5b9 100644
--- a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
+++ b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
@@ -1055,9 +1055,8 @@
 
 class ScanStreamOnKeyValueResponseParams extends bindings.Struct {
   static const List<bindings.StructDataHeader> kVersions = const [
-    const bindings.StructDataHeader(16, 0)
+    const bindings.StructDataHeader(8, 0)
   ];
-  bool ack = false;
 
   ScanStreamOnKeyValueResponseParams() : super(kVersions.last.size);
 
@@ -1092,27 +1091,19 @@
         'Message newer than the last known version cannot be shorter than '
         'required by the last known version.');
     }
-    if (mainDataHeader.version >= 0) {
-      
-      result.ack = decoder0.decodeBool(8, 0);
-    }
     return result;
   }
 
   void encode(bindings.Encoder encoder) {
-    var encoder0 = encoder.getStructEncoderAtOffset(kVersions.last);
-    
-    encoder0.encodeBool(ack, 8, 0);
+    encoder.getStructEncoderAtOffset(kVersions.last);
   }
 
   String toString() {
-    return "ScanStreamOnKeyValueResponseParams("
-           "ack: $ack" ")";
+    return "ScanStreamOnKeyValueResponseParams("")";
   }
 
   Map toJson() {
     Map map = new Map();
-    map["ack"] = ack;
     return map;
   }
 }
@@ -1252,9 +1243,8 @@
 
 class WatchGlobStreamOnChangeResponseParams extends bindings.Struct {
   static const List<bindings.StructDataHeader> kVersions = const [
-    const bindings.StructDataHeader(16, 0)
+    const bindings.StructDataHeader(8, 0)
   ];
-  bool ack = false;
 
   WatchGlobStreamOnChangeResponseParams() : super(kVersions.last.size);
 
@@ -1289,27 +1279,19 @@
         'Message newer than the last known version cannot be shorter than '
         'required by the last known version.');
     }
-    if (mainDataHeader.version >= 0) {
-      
-      result.ack = decoder0.decodeBool(8, 0);
-    }
     return result;
   }
 
   void encode(bindings.Encoder encoder) {
-    var encoder0 = encoder.getStructEncoderAtOffset(kVersions.last);
-    
-    encoder0.encodeBool(ack, 8, 0);
+    encoder.getStructEncoderAtOffset(kVersions.last);
   }
 
   String toString() {
-    return "WatchGlobStreamOnChangeResponseParams("
-           "ack: $ack" ")";
+    return "WatchGlobStreamOnChangeResponseParams("")";
   }
 
   Map toJson() {
     Map map = new Map();
-    map["ack"] = ack;
     return map;
   }
 }
@@ -7687,9 +7669,8 @@
   static const String name = ScanStreamName;
 
 
-  ScanStreamOnKeyValueResponseParams _ScanStreamOnKeyValueResponseParamsFactory(bool ack) {
+  ScanStreamOnKeyValueResponseParams _ScanStreamOnKeyValueResponseParamsFactory() {
     var result = new ScanStreamOnKeyValueResponseParams();
-    result.ack = ack;
     return result;
   }
 
@@ -7891,9 +7872,8 @@
   static const String name = WatchGlobStreamName;
 
 
-  WatchGlobStreamOnChangeResponseParams _WatchGlobStreamOnChangeResponseParamsFactory(bool ack) {
+  WatchGlobStreamOnChangeResponseParams _WatchGlobStreamOnChangeResponseParamsFactory() {
     var result = new WatchGlobStreamOnChangeResponseParams();
-    result.ack = ack;
     return result;
   }
 
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index 3a690a6..6fa42bc 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -9,7 +9,7 @@
   SyncbaseNoSqlDatabase._internal(
       _proxy, _parentFullName, relativeName, batchSuffix)
       : super._internal(_proxy, _parentFullName, relativeName,
-          naming.join(_parentFullName, escape(relativeName) + batchSuffix));
+            naming.join(_parentFullName, escape(relativeName) + batchSuffix));
 
   // table returns a table with the given relativeName.
   SyncbaseTable table(String relativeName) {
@@ -137,18 +137,22 @@
   }
 }
 
-class WatchGlobStreamImpl extends Object with StreamFlowControl
+class WatchGlobStreamImpl extends Object
+    with StreamFlowControl
     implements mojom.WatchGlobStream {
   final StreamController<mojom.WatchChange> sc;
 
   WatchGlobStreamImpl._fromStreamController(this.sc) {
-    setupFlowControl(this.sc);
+    initFlowControl(this.sc);
   }
 
-  Future<mojom.WatchGlobStreamOnChangeResponseParams> onChange(
-      mojom.WatchChange change, Function resultFactory) {
-
-    // Testing instrumentations for testing flow control.
+  Future onChange(mojom.WatchChange change, [Function newAck = null]) {
+    // NOTE(aghassemi): We need to make newAck optional to match mojo's
+    // define class, but newAck is always provided by mojo when called.
+    if (newAck == null) {
+      throw new ArgumentError('newAck can not be null');
+    }
+    // Testing instrumentation for testing flow control.
     if (testing.isTesting) {
       testing.DatabaseWatch.onChangeCounter.increment();
     }
@@ -156,13 +160,8 @@
     sc.add(change);
 
     // Only ack after we get unlocked.
-    // If we are not locked, onNextUnlock completes immediately.
-    var ack = onNextUnlock().then((_) {
-
-      // Send an ack back to server.
-      return new Future.value(resultFactory(true));
-    });
-    return ack;
+    // If we are not locked, onNextUnlock returns immediately.
+    return onNextUnlock().then((_) => newAck());
   }
 
   // Called by the mojo proxy when the Go function call returns.
diff --git a/dart/lib/src/nosql/row.dart b/dart/lib/src/nosql/row.dart
index 5317709..293fe5a 100644
--- a/dart/lib/src/nosql/row.dart
+++ b/dart/lib/src/nosql/row.dart
@@ -9,7 +9,7 @@
 
   SyncbaseRow._internal(_proxy, _parentFullName, key)
       : super._internal(_proxy, _parentFullName, key,
-          naming.join(_parentFullName, escape(key))),
+            naming.join(_parentFullName, escape(key))),
         this.key = key;
 
   Future<bool> exists() async {
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index 83a1903..c3f0e92 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -7,7 +7,7 @@
 class SyncbaseTable extends NamedResource {
   SyncbaseTable._internal(_proxy, _parentFullName, relativeName)
       : super._internal(_proxy, _parentFullName, relativeName,
-          naming.join(_parentFullName, escape(relativeName)));
+            naming.join(_parentFullName, escape(relativeName)));
 
   // row returns a row with the given key.
   SyncbaseRow row(String key) {
@@ -97,26 +97,27 @@
   }
 }
 
-class ScanStreamImpl extends Object with StreamFlowControl
+class ScanStreamImpl extends Object
+    with StreamFlowControl
     implements mojom.ScanStream {
   final StreamController<mojom.KeyValue> sc;
 
   ScanStreamImpl._fromStreamController(this.sc) {
-    setupFlowControl(this.sc);
+    initFlowControl(this.sc);
   }
 
-  Future<mojom.ScanStreamOnChangeResponseParams> onKeyValue(
-      mojom.KeyValue keyValue, Function resultFactory) {
+  Future onKeyValue(mojom.KeyValue keyValue, [Function newAck = null]) {
+    // NOTE(aghassemi): We need to make newAck optional to match mojo's
+    // define class, but newAck is always provided by mojo when called.
+    if (newAck == null) {
+      throw new ArgumentError('newAck can not be null');
+    }
+
     sc.add(keyValue);
 
     // Only ack after we get unlocked.
-    // If we are not locked, onNextUnlock completes immediately.
-    var ack = onNextUnlock().then((_) {
-
-      // Send an ack back to server.
-      return new Future.value(resultFactory(true));
-    });
-    return ack;
+    // If we are not locked, onNextUnlock returns immediately.
+    return onNextUnlock().then((_) => newAck());
   }
 
   // Called by the mojom proxy when the Go function call returns.
diff --git a/dart/lib/src/stream_flow_control.dart b/dart/lib/src/stream_flow_control.dart
index e62d3b6..342c2fb 100644
--- a/dart/lib/src/stream_flow_control.dart
+++ b/dart/lib/src/stream_flow_control.dart
@@ -4,17 +4,17 @@
 
 part of syncbase_client;
 
-// StreamFlowControl is a mixin that exposes methods setupFlowControl() and
+// StreamFlowControl is a mixin that exposes methods initFlowControl() and
 // onNextUnlock() to allow mixers add flow control to their stream controllers.
 class StreamFlowControl {
   // We are in "locked" state if and only if _mutex is not null.
   // We are locked by default until we gain the first subscriber.
-  Future _mutex = new Completer();
+  Completer _mutex = new Completer();
 
   // Setup flow control by adding listeners to the stream controller.
   // Mixin classes can't have constructors, so this is just a method that should
   // be called from mixer's constructor.
-  setupFlowControl(StreamController sc) {
+  initFlowControl(StreamController sc) {
     // Unlock when gaining the first subscriber.
     sc.onListen = _unlock;
     // Lock when losing the last subscriber.
diff --git a/dart/lib/src/testing_instrumentation.dart b/dart/lib/src/testing_instrumentation.dart
new file mode 100644
index 0000000..a9ce2e4
--- /dev/null
+++ b/dart/lib/src/testing_instrumentation.dart
@@ -0,0 +1,33 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This library facilitates communication between implementation and test code.
+// This avoids the need for a complex dependency injection setup just to test
+// specific bits of implementation.
+library testing_instrumentation;
+
+// Set to true by test runner; implementation should check this variable before
+// touching other state in this module.
+bool isTesting = false;
+
+// Testing instrumentation for database watch implementation.
+abstract class DatabaseWatch {
+  // Allows database watch implementation to communicate how many changes
+  // it has received from the server.
+  static Counter onChangeCounter = new Counter._internal();
+}
+
+// A generic counter.
+class Counter {
+  num count = 0;
+  Counter._internal();
+
+  increment() {
+    count++;
+  }
+
+  reset() {
+    count = 0;
+  }
+}
diff --git a/dart/lib/src/testing_instrumentations.dart b/dart/lib/src/testing_instrumentations.dart
deleted file mode 100644
index 00a1c2e..0000000
--- a/dart/lib/src/testing_instrumentations.dart
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// This library provides some globals that can be accessed from tests and
-// implementation code so they can communicate certain private states to each
-// other enabling otherwise hard-to-do end-to-end tests.
-// Although mocking is a preferred approach, to allow mocking of private classes
-// deep in the stack, we would need to implement a full dependency injection
-// solution since Dart does not allow mocking libraries and files merely based
-// on import path.
-library testing_instrumentations;
-
-// To be set to true by test runner, any implementation code that needs to
-// instrument for testing, must check this first.
-bool isTesting = false;
-
-// Testing instrumentations for Database Watch implementation.
-abstract class DatabaseWatch {
-  // Allows Database Watch implementation to communicate how many changes
-  // it has received from the server.
-  static _Counter onChangeCounter = new Counter._internal();
-}
-
-// A generic counter.
-class Counter {
-  num count = 0;
-  Counter._internal();
-
-  increment() {
-    count++;
-  }
-  reset() {
-    count = 0;
-  }
-}
diff --git a/dart/lib/syncbase_client.dart b/dart/lib/syncbase_client.dart
index f81b4cd..473e087 100644
--- a/dart/lib/syncbase_client.dart
+++ b/dart/lib/syncbase_client.dart
@@ -9,7 +9,7 @@
 
 import 'package:mojo/bindings.dart' as bindings;
 import 'package:ether/src/naming/util.dart' as naming;
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
 
 import 'gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart' as mojom;
 
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index cd00f45..6afae8c 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -9,7 +9,7 @@
 
 import 'package:test/test.dart';
 
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
 import 'package:ether/syncbase_client.dart'
     show SyncbaseClient, WatchChangeTypes, WatchChange, WatchGlobStreamImpl;
 
@@ -119,7 +119,7 @@
 
     // Listen for the data on the stream.
     var allExpectedChangesReceived = new Completer();
-    Function onData(_) {
+    onData(_) {
       if (testing.DatabaseWatch.onChangeCounter.count == numOperations) {
         allExpectedChangesReceived.complete();
       }
diff --git a/dart/test/integration/syncbase_test.dart b/dart/test/integration/syncbase_test.dart
index b8a9002..6941873 100755
--- a/dart/test/integration/syncbase_test.dart
+++ b/dart/test/integration/syncbase_test.dart
@@ -8,7 +8,7 @@
 import 'package:mojo/core.dart' show MojoHandle;
 import 'package:test/test.dart';
 
-import 'package:ether/src/testing_instrumentations.dart' as testing;
+import 'package:ether/src/testing_instrumentation.dart' as testing;
 import 'package:ether/initialized_application.dart' show InitializedApplication;
 import 'package:ether/syncbase_client.dart' show SyncbaseClient;
 
@@ -20,7 +20,7 @@
 import './syncbase_table_test.dart' show runTableTests;
 
 main(List args) async {
-  // Enable testing instrumentations.
+  // Enable testing instrumentation.
   testing.isTesting = true;
 
   InitializedApplication app = new InitializedApplication.fromHandle(args[0]);
diff --git a/mojom/syncbase.mojom b/mojom/syncbase.mojom
index 5a7387a..b8e7eea 100644
--- a/mojom/syncbase.mojom
+++ b/mojom/syncbase.mojom
@@ -61,8 +61,7 @@
 // an ack back from client before sending the next KeyValue, effectively
 // creating a mechanism for flow control.
 interface ScanStream {
-  // TODO(aghassemi): Same TODO as WatchGlobStream.OnChange
-  OnKeyValue(KeyValue key_value) => (bool ack);
+  OnKeyValue(KeyValue key_value) => ();
   OnReturn(Error err);
 };
 
@@ -86,12 +85,7 @@
 // an ack back from client before sending the next change, effectively creating
 // a mechanism for flow control.
 interface WatchGlobStream {
-  // TODO(aghassemi): We should be able to do => () instead of => (bool ack)
-  // but mojom Go compiler has a bug where it does not differentiate between
-  // no return and => () therefore call does not actually block until callback
-  // is called. This is a bug and for now we get around it by returning a bool.
-  // See https://github.com/vanadium/issues/issues/749 for details.
-  OnChange(WatchChange change) => (bool ack);
+  OnChange(WatchChange change) => ();
   OnError(Error err);
 };