mojo/syncbase: Implementing Exec with flow control.

MultiPart: 2/2
Change-Id: Ieda02c379960f6dbe6e86f72bffe10e64b4fa101
diff --git a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
index 6e420bf..04da065 100644
--- a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
+++ b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
@@ -996,26 +996,82 @@
 }
 
 
-class ExecStreamOnReturnParams extends bindings.Struct {
+class ExecStreamOnResultResponseParams extends bindings.Struct {
   static const List<bindings.StructDataHeader> kVersions = const [
-    const bindings.StructDataHeader(16, 0)
+    const bindings.StructDataHeader(8, 0)
   ];
-  Error err = null;
 
-  ExecStreamOnReturnParams() : super(kVersions.last.size);
+  ExecStreamOnResultResponseParams() : super(kVersions.last.size);
 
-  static ExecStreamOnReturnParams deserialize(bindings.Message message) {
+  static ExecStreamOnResultResponseParams deserialize(bindings.Message message) {
     var decoder = new bindings.Decoder(message);
     var result = decode(decoder);
     decoder.excessHandles.forEach((h) => h.close());
     return result;
   }
 
-  static ExecStreamOnReturnParams decode(bindings.Decoder decoder0) {
+  static ExecStreamOnResultResponseParams decode(bindings.Decoder decoder0) {
     if (decoder0 == null) {
       return null;
     }
-    ExecStreamOnReturnParams result = new ExecStreamOnReturnParams();
+    ExecStreamOnResultResponseParams result = new ExecStreamOnResultResponseParams();
+
+    var mainDataHeader = decoder0.decodeStructDataHeader();
+    if (mainDataHeader.version <= kVersions.last.version) {
+      // Scan in reverse order to optimize for more recent versions.
+      for (int i = kVersions.length - 1; i >= 0; --i) {
+        if (mainDataHeader.version >= kVersions[i].version) {
+          if (mainDataHeader.size == kVersions[i].size) {
+            // Found a match.
+            break;
+          }
+          throw new bindings.MojoCodecError(
+              'Header size doesn\'t correspond to known version size.');
+        }
+      }
+    } else if (mainDataHeader.size < kVersions.last.size) {
+      throw new bindings.MojoCodecError(
+        'Message newer than the last known version cannot be shorter than '
+        'required by the last known version.');
+    }
+    return result;
+  }
+
+  void encode(bindings.Encoder encoder) {
+    encoder.getStructEncoderAtOffset(kVersions.last);
+  }
+
+  String toString() {
+    return "ExecStreamOnResultResponseParams("")";
+  }
+
+  Map toJson() {
+    Map map = new Map();
+    return map;
+  }
+}
+
+
+class ExecStreamOnDoneParams extends bindings.Struct {
+  static const List<bindings.StructDataHeader> kVersions = const [
+    const bindings.StructDataHeader(16, 0)
+  ];
+  Error err = null;
+
+  ExecStreamOnDoneParams() : super(kVersions.last.size);
+
+  static ExecStreamOnDoneParams deserialize(bindings.Message message) {
+    var decoder = new bindings.Decoder(message);
+    var result = decode(decoder);
+    decoder.excessHandles.forEach((h) => h.close());
+    return result;
+  }
+
+  static ExecStreamOnDoneParams decode(bindings.Decoder decoder0) {
+    if (decoder0 == null) {
+      return null;
+    }
+    ExecStreamOnDoneParams result = new ExecStreamOnDoneParams();
 
     var mainDataHeader = decoder0.decodeStructDataHeader();
     if (mainDataHeader.version <= kVersions.last.version) {
@@ -1050,7 +1106,7 @@
   }
 
   String toString() {
-    return "ExecStreamOnReturnParams("
+    return "ExecStreamOnDoneParams("
            "err: $err" ")";
   }
 
@@ -1184,26 +1240,26 @@
 }
 
 
-class ScanStreamOnReturnParams extends bindings.Struct {
+class ScanStreamOnDoneParams extends bindings.Struct {
   static const List<bindings.StructDataHeader> kVersions = const [
     const bindings.StructDataHeader(16, 0)
   ];
   Error err = null;
 
-  ScanStreamOnReturnParams() : super(kVersions.last.size);
+  ScanStreamOnDoneParams() : super(kVersions.last.size);
 
-  static ScanStreamOnReturnParams deserialize(bindings.Message message) {
+  static ScanStreamOnDoneParams deserialize(bindings.Message message) {
     var decoder = new bindings.Decoder(message);
     var result = decode(decoder);
     decoder.excessHandles.forEach((h) => h.close());
     return result;
   }
 
-  static ScanStreamOnReturnParams decode(bindings.Decoder decoder0) {
+  static ScanStreamOnDoneParams decode(bindings.Decoder decoder0) {
     if (decoder0 == null) {
       return null;
     }
-    ScanStreamOnReturnParams result = new ScanStreamOnReturnParams();
+    ScanStreamOnDoneParams result = new ScanStreamOnDoneParams();
 
     var mainDataHeader = decoder0.decodeStructDataHeader();
     if (mainDataHeader.version <= kVersions.last.version) {
@@ -1238,7 +1294,7 @@
   }
 
   String toString() {
-    return "ScanStreamOnReturnParams("
+    return "ScanStreamOnDoneParams("
            "err: $err" ")";
   }
 
@@ -7420,14 +7476,14 @@
 }
 
 const int kExecStream_onResult_name = 0;
-const int kExecStream_onReturn_name = 1;
+const int kExecStream_onDone_name = 1;
 
 const String ExecStreamName =
       'mojo::ExecStream';
 
 abstract class ExecStream {
-  void onResult(Result result);
-  void onReturn(Error err);
+  Future<ExecStreamOnResultResponseParams> onResult(Result result,[Function responseFactory = null]);
+  void onDone(Error err);
 
 }
 
@@ -7451,6 +7507,20 @@
 
   void handleResponse(bindings.ServiceMessage message) {
     switch (message.header.type) {
+      case kExecStream_onResult_name:
+        var r = ExecStreamOnResultResponseParams.deserialize(
+            message.payload);
+        if (!message.header.hasRequestId) {
+          throw 'Expected a message with a valid request Id.';
+        }
+        Completer c = completerMap[message.header.requestId];
+        if (c == null) {
+          throw 'Message had unknown request Id: ${message.header.requestId}';
+        }
+        completerMap.remove(message.header.requestId);
+        assert(!c.isCompleted);
+        c.complete(r);
+        break;
       default:
         throw new bindings.MojoCodecError("Unexpected message name");
         break;
@@ -7468,18 +7538,21 @@
   ExecStreamProxyImpl _proxyImpl;
 
   _ExecStreamProxyCalls(this._proxyImpl);
-    void onResult(Result result) {
+    Future<ExecStreamOnResultResponseParams> onResult(Result result,[Function responseFactory = null]) {
       assert(_proxyImpl.isBound);
       var params = new ExecStreamOnResultParams();
       params.result = result;
-      _proxyImpl.sendMessage(params, kExecStream_onResult_name);
+      return _proxyImpl.sendMessageWithRequestId(
+          params,
+          kExecStream_onResult_name,
+          -1,
+          bindings.MessageHeader.kMessageExpectsResponse);
     }
-  
-    void onReturn(Error err) {
+    void onDone(Error err) {
       assert(_proxyImpl.isBound);
-      var params = new ExecStreamOnReturnParams();
+      var params = new ExecStreamOnDoneParams();
       params.err = err;
-      _proxyImpl.sendMessage(params, kExecStream_onReturn_name);
+      _proxyImpl.sendMessage(params, kExecStream_onDone_name);
     }
   
 }
@@ -7553,6 +7626,10 @@
   static const String name = ExecStreamName;
 
 
+  ExecStreamOnResultResponseParams _ExecStreamOnResultResponseParamsFactory() {
+    var result = new ExecStreamOnResultResponseParams();
+    return result;
+  }
 
   Future<bindings.Message> handleMessage(bindings.ServiceMessage message) {
     if (bindings.ControlMessageHandler.isControlMessage(message)) {
@@ -7565,12 +7642,20 @@
       case kExecStream_onResult_name:
         var params = ExecStreamOnResultParams.deserialize(
             message.payload);
-        _impl.onResult(params.result);
+        return _impl.onResult(params.result,_ExecStreamOnResultResponseParamsFactory).then((response) {
+          if (response != null) {
+            return buildResponseWithId(
+                response,
+                kExecStream_onResult_name,
+                message.header.requestId,
+                bindings.MessageHeader.kMessageIsResponse);
+          }
+        });
         break;
-      case kExecStream_onReturn_name:
-        var params = ExecStreamOnReturnParams.deserialize(
+      case kExecStream_onDone_name:
+        var params = ExecStreamOnDoneParams.deserialize(
             message.payload);
-        _impl.onReturn(params.err);
+        _impl.onDone(params.err);
         break;
       default:
         throw new bindings.MojoCodecError("Unexpected message name");
@@ -7594,14 +7679,14 @@
 }
 
 const int kScanStream_onKeyValue_name = 0;
-const int kScanStream_onReturn_name = 1;
+const int kScanStream_onDone_name = 1;
 
 const String ScanStreamName =
       'mojo::ScanStream';
 
 abstract class ScanStream {
   Future<ScanStreamOnKeyValueResponseParams> onKeyValue(KeyValue keyValue,[Function responseFactory = null]);
-  void onReturn(Error err);
+  void onDone(Error err);
 
 }
 
@@ -7666,11 +7751,11 @@
           -1,
           bindings.MessageHeader.kMessageExpectsResponse);
     }
-    void onReturn(Error err) {
+    void onDone(Error err) {
       assert(_proxyImpl.isBound);
-      var params = new ScanStreamOnReturnParams();
+      var params = new ScanStreamOnDoneParams();
       params.err = err;
-      _proxyImpl.sendMessage(params, kScanStream_onReturn_name);
+      _proxyImpl.sendMessage(params, kScanStream_onDone_name);
     }
   
 }
@@ -7770,10 +7855,10 @@
           }
         });
         break;
-      case kScanStream_onReturn_name:
-        var params = ScanStreamOnReturnParams.deserialize(
+      case kScanStream_onDone_name:
+        var params = ScanStreamOnDoneParams.deserialize(
             message.payload);
-        _impl.onReturn(params.err);
+        _impl.onDone(params.err);
         break;
       default:
         throw new bindings.MojoCodecError("Unexpected message name");
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index 6fa42bc..3649552 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -45,17 +45,13 @@
 
   Stream<mojom.Result> exec(String query) {
     StreamController<mojom.Result> sc = new StreamController();
-    mojom.ExecStream execStream = new ExecStreamImpl._fromStreamController(sc);
+
+    mojom.ExecStreamStub stub = new mojom.ExecStreamStub.unbound();
+    stub.impl = new ExecStreamImpl._fromStreamController(sc);
 
     // Call dbExec asynchronously.
-    _proxy.ptr.dbExec(fullName, query, execStream).then((v) {
-      // TODO(nlacasse): Is throwing the correct behavior here?  Consider
-      // returning a tuple (Stream<mojom.Result>, Future) and resolve the
-      // Future at the end of the RPC (with an error if applicable).  Then
-      // errors will be handled the same in this method as in all the other
-      // methods that return Futures.  (Even though the other methods seem to
-      // "throw", they are actually resolving a Future since the function is
-      // declared with "async".)
+    _proxy.ptr.dbExec(fullName, query, stub).then((v) {
+      // TODO(nlacasse): Same question regarding throwing behavior as TableScan.
       if (isError(v.err)) throw v.err;
     });
 
@@ -120,16 +116,30 @@
   }
 }
 
-class ExecStreamImpl implements mojom.ExecStream {
+class ExecStreamImpl extends Object
+    with StreamFlowControl
+    implements mojom.ExecStream {
   final StreamController<mojom.Result> sc;
-  ExecStreamImpl._fromStreamController(this.sc);
 
-  onResult(mojom.Result result) {
+  ExecStreamImpl._fromStreamController(this.sc) {
+    initFlowControl(this.sc);
+  }
+
+  onResult(mojom.Result result, [Function newAck = null]) {
+    // NOTE(aghassemi): We need to make newAck optional to match mojo's
+    // define class, but newAck is always provided by mojo when called.
+    if (newAck == null) {
+      throw new ArgumentError('newAck can not be null');
+    }
     sc.add(result);
+
+    // Only ack after we get unlocked.
+    // If we are not locked, onNextUnlock returns immediately.
+    return onNextUnlock().then((_) => newAck());
   }
 
   // Called by the mojo proxy when the Go function call returns.
-  onReturn(mojom.Error err) {
+  onDone(mojom.Error err) {
     if (isError(err)) {
       sc.addError(err);
     }
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index c3f0e92..67d4d61 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -121,7 +121,7 @@
   }
 
   // Called by the mojom proxy when the Go function call returns.
-  onReturn(mojom.Error err) {
+  onDone(mojom.Error err) {
     if (isError(err)) {
       sc.addError(err);
     }
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index 8d1c30b..96642e9 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -150,5 +150,39 @@
     expect(testing.DatabaseWatch.onChangeCounter.count, equals(numOperations));
   });
 
+  // TODO(aghassemi): Currently queries do not work with non-VOM-encoded values.
+  // See issue: https://github.com/vanadium/issues/issues/766
+
+  // test('basic exec', () async {
+  //   var app = c.app(utils.uniqueName('app'));
+  //   await app.create(utils.emptyPerms());
+  //   var db = app.noSqlDatabase(utils.uniqueName('db'));
+  //   await db.create(utils.emptyPerms());
+  //   var table = db.table('airports');
+  //   await table.create(utils.emptyPerms());
+  //
+  //   await table.put('sfo', UTF8.encode('San Francisco'));
+  //   await table.put('yyz', UTF8.encode('Toronto'));
+  //
+  //   var query = 'select k as code, v as cityname from airports';
+  //   var resultStream = db.exec(query);
+  //
+  //   var results = await resultStream.toList();
+  //
+  //   // Expect first entry to be column headers.
+  //   var headers = results[0].values;
+  //   expect(headers, equals([UTF8.decode('code'), UTF8.decode('cityname')]));
+  //
+  //   // Expect the two entries
+  //   var entry1 = results[1].values;
+  //   expect(entry1, equals([UTF8.decode('sfo'), UTF8.decode('San Francisco')]));
+  //
+  //   var entry2 = results[2].values;
+  //   expect(entry2, equals([UTF8.decode('yyz'), UTF8.decode('Toronto')]));
+  //
+  //   // Expect no more entries than two data rows and one column header.
+  //   expect(resultStream.length, 3);
+  // });
+
   // TODO(nlacasse): Test database.get/setPermissions.
 }
diff --git a/mojom/syncbase.mojom b/mojom/syncbase.mojom
index 5fddb56..e03fade 100644
--- a/mojom/syncbase.mojom
+++ b/mojom/syncbase.mojom
@@ -51,9 +51,12 @@
   array<array<uint8>> values;
 };
 
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
 interface ExecStream {
-  OnResult(Result result);
-  OnReturn(Error err);
+  OnResult(Result result) => ();
+  OnDone(Error err);
 };
 
 struct KeyValue {
@@ -61,13 +64,12 @@
   array<uint8> value;
 };
 
-// ScanStream implements the observer + ack pattern.
-// In this model, server pushes a change object and waits until it receives
-// an ack back from client before sending the next KeyValue, effectively
-// creating a mechanism for flow control.
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
 interface ScanStream {
   OnKeyValue(KeyValue key_value) => ();
-  OnReturn(Error err);
+  OnDone(Error err);
 };
 
 struct WatchChange {
@@ -85,10 +87,9 @@
   array<uint8> resume_marker;
 };
 
-// WatchGlobStream implements the observer + ack pattern.
-// In this model, server pushes a change object and waits until it receives
-// an ack back from client before sending the next change, effectively creating
-// a mechanism for flow control.
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
 interface WatchGlobStream {
   OnChange(WatchChange change) => ();
   OnError(Error err);