mojo/syncbase: Implementing Exec with flow control.
MultiPart: 2/2
Change-Id: Ieda02c379960f6dbe6e86f72bffe10e64b4fa101
diff --git a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
index 6e420bf..04da065 100644
--- a/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
+++ b/dart/lib/gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart
@@ -996,26 +996,82 @@
}
-class ExecStreamOnReturnParams extends bindings.Struct {
+class ExecStreamOnResultResponseParams extends bindings.Struct {
static const List<bindings.StructDataHeader> kVersions = const [
- const bindings.StructDataHeader(16, 0)
+ const bindings.StructDataHeader(8, 0)
];
- Error err = null;
- ExecStreamOnReturnParams() : super(kVersions.last.size);
+ ExecStreamOnResultResponseParams() : super(kVersions.last.size);
- static ExecStreamOnReturnParams deserialize(bindings.Message message) {
+ static ExecStreamOnResultResponseParams deserialize(bindings.Message message) {
var decoder = new bindings.Decoder(message);
var result = decode(decoder);
decoder.excessHandles.forEach((h) => h.close());
return result;
}
- static ExecStreamOnReturnParams decode(bindings.Decoder decoder0) {
+ static ExecStreamOnResultResponseParams decode(bindings.Decoder decoder0) {
if (decoder0 == null) {
return null;
}
- ExecStreamOnReturnParams result = new ExecStreamOnReturnParams();
+ ExecStreamOnResultResponseParams result = new ExecStreamOnResultResponseParams();
+
+ var mainDataHeader = decoder0.decodeStructDataHeader();
+ if (mainDataHeader.version <= kVersions.last.version) {
+ // Scan in reverse order to optimize for more recent versions.
+ for (int i = kVersions.length - 1; i >= 0; --i) {
+ if (mainDataHeader.version >= kVersions[i].version) {
+ if (mainDataHeader.size == kVersions[i].size) {
+ // Found a match.
+ break;
+ }
+ throw new bindings.MojoCodecError(
+ 'Header size doesn\'t correspond to known version size.');
+ }
+ }
+ } else if (mainDataHeader.size < kVersions.last.size) {
+ throw new bindings.MojoCodecError(
+ 'Message newer than the last known version cannot be shorter than '
+ 'required by the last known version.');
+ }
+ return result;
+ }
+
+ void encode(bindings.Encoder encoder) {
+ encoder.getStructEncoderAtOffset(kVersions.last);
+ }
+
+ String toString() {
+ return "ExecStreamOnResultResponseParams("")";
+ }
+
+ Map toJson() {
+ Map map = new Map();
+ return map;
+ }
+}
+
+
+class ExecStreamOnDoneParams extends bindings.Struct {
+ static const List<bindings.StructDataHeader> kVersions = const [
+ const bindings.StructDataHeader(16, 0)
+ ];
+ Error err = null;
+
+ ExecStreamOnDoneParams() : super(kVersions.last.size);
+
+ static ExecStreamOnDoneParams deserialize(bindings.Message message) {
+ var decoder = new bindings.Decoder(message);
+ var result = decode(decoder);
+ decoder.excessHandles.forEach((h) => h.close());
+ return result;
+ }
+
+ static ExecStreamOnDoneParams decode(bindings.Decoder decoder0) {
+ if (decoder0 == null) {
+ return null;
+ }
+ ExecStreamOnDoneParams result = new ExecStreamOnDoneParams();
var mainDataHeader = decoder0.decodeStructDataHeader();
if (mainDataHeader.version <= kVersions.last.version) {
@@ -1050,7 +1106,7 @@
}
String toString() {
- return "ExecStreamOnReturnParams("
+ return "ExecStreamOnDoneParams("
"err: $err" ")";
}
@@ -1184,26 +1240,26 @@
}
-class ScanStreamOnReturnParams extends bindings.Struct {
+class ScanStreamOnDoneParams extends bindings.Struct {
static const List<bindings.StructDataHeader> kVersions = const [
const bindings.StructDataHeader(16, 0)
];
Error err = null;
- ScanStreamOnReturnParams() : super(kVersions.last.size);
+ ScanStreamOnDoneParams() : super(kVersions.last.size);
- static ScanStreamOnReturnParams deserialize(bindings.Message message) {
+ static ScanStreamOnDoneParams deserialize(bindings.Message message) {
var decoder = new bindings.Decoder(message);
var result = decode(decoder);
decoder.excessHandles.forEach((h) => h.close());
return result;
}
- static ScanStreamOnReturnParams decode(bindings.Decoder decoder0) {
+ static ScanStreamOnDoneParams decode(bindings.Decoder decoder0) {
if (decoder0 == null) {
return null;
}
- ScanStreamOnReturnParams result = new ScanStreamOnReturnParams();
+ ScanStreamOnDoneParams result = new ScanStreamOnDoneParams();
var mainDataHeader = decoder0.decodeStructDataHeader();
if (mainDataHeader.version <= kVersions.last.version) {
@@ -1238,7 +1294,7 @@
}
String toString() {
- return "ScanStreamOnReturnParams("
+ return "ScanStreamOnDoneParams("
"err: $err" ")";
}
@@ -7420,14 +7476,14 @@
}
const int kExecStream_onResult_name = 0;
-const int kExecStream_onReturn_name = 1;
+const int kExecStream_onDone_name = 1;
const String ExecStreamName =
'mojo::ExecStream';
abstract class ExecStream {
- void onResult(Result result);
- void onReturn(Error err);
+ Future<ExecStreamOnResultResponseParams> onResult(Result result,[Function responseFactory = null]);
+ void onDone(Error err);
}
@@ -7451,6 +7507,20 @@
void handleResponse(bindings.ServiceMessage message) {
switch (message.header.type) {
+ case kExecStream_onResult_name:
+ var r = ExecStreamOnResultResponseParams.deserialize(
+ message.payload);
+ if (!message.header.hasRequestId) {
+ throw 'Expected a message with a valid request Id.';
+ }
+ Completer c = completerMap[message.header.requestId];
+ if (c == null) {
+ throw 'Message had unknown request Id: ${message.header.requestId}';
+ }
+ completerMap.remove(message.header.requestId);
+ assert(!c.isCompleted);
+ c.complete(r);
+ break;
default:
throw new bindings.MojoCodecError("Unexpected message name");
break;
@@ -7468,18 +7538,21 @@
ExecStreamProxyImpl _proxyImpl;
_ExecStreamProxyCalls(this._proxyImpl);
- void onResult(Result result) {
+ Future<ExecStreamOnResultResponseParams> onResult(Result result,[Function responseFactory = null]) {
assert(_proxyImpl.isBound);
var params = new ExecStreamOnResultParams();
params.result = result;
- _proxyImpl.sendMessage(params, kExecStream_onResult_name);
+ return _proxyImpl.sendMessageWithRequestId(
+ params,
+ kExecStream_onResult_name,
+ -1,
+ bindings.MessageHeader.kMessageExpectsResponse);
}
-
- void onReturn(Error err) {
+ void onDone(Error err) {
assert(_proxyImpl.isBound);
- var params = new ExecStreamOnReturnParams();
+ var params = new ExecStreamOnDoneParams();
params.err = err;
- _proxyImpl.sendMessage(params, kExecStream_onReturn_name);
+ _proxyImpl.sendMessage(params, kExecStream_onDone_name);
}
}
@@ -7553,6 +7626,10 @@
static const String name = ExecStreamName;
+ ExecStreamOnResultResponseParams _ExecStreamOnResultResponseParamsFactory() {
+ var result = new ExecStreamOnResultResponseParams();
+ return result;
+ }
Future<bindings.Message> handleMessage(bindings.ServiceMessage message) {
if (bindings.ControlMessageHandler.isControlMessage(message)) {
@@ -7565,12 +7642,20 @@
case kExecStream_onResult_name:
var params = ExecStreamOnResultParams.deserialize(
message.payload);
- _impl.onResult(params.result);
+ return _impl.onResult(params.result,_ExecStreamOnResultResponseParamsFactory).then((response) {
+ if (response != null) {
+ return buildResponseWithId(
+ response,
+ kExecStream_onResult_name,
+ message.header.requestId,
+ bindings.MessageHeader.kMessageIsResponse);
+ }
+ });
break;
- case kExecStream_onReturn_name:
- var params = ExecStreamOnReturnParams.deserialize(
+ case kExecStream_onDone_name:
+ var params = ExecStreamOnDoneParams.deserialize(
message.payload);
- _impl.onReturn(params.err);
+ _impl.onDone(params.err);
break;
default:
throw new bindings.MojoCodecError("Unexpected message name");
@@ -7594,14 +7679,14 @@
}
const int kScanStream_onKeyValue_name = 0;
-const int kScanStream_onReturn_name = 1;
+const int kScanStream_onDone_name = 1;
const String ScanStreamName =
'mojo::ScanStream';
abstract class ScanStream {
Future<ScanStreamOnKeyValueResponseParams> onKeyValue(KeyValue keyValue,[Function responseFactory = null]);
- void onReturn(Error err);
+ void onDone(Error err);
}
@@ -7666,11 +7751,11 @@
-1,
bindings.MessageHeader.kMessageExpectsResponse);
}
- void onReturn(Error err) {
+ void onDone(Error err) {
assert(_proxyImpl.isBound);
- var params = new ScanStreamOnReturnParams();
+ var params = new ScanStreamOnDoneParams();
params.err = err;
- _proxyImpl.sendMessage(params, kScanStream_onReturn_name);
+ _proxyImpl.sendMessage(params, kScanStream_onDone_name);
}
}
@@ -7770,10 +7855,10 @@
}
});
break;
- case kScanStream_onReturn_name:
- var params = ScanStreamOnReturnParams.deserialize(
+ case kScanStream_onDone_name:
+ var params = ScanStreamOnDoneParams.deserialize(
message.payload);
- _impl.onReturn(params.err);
+ _impl.onDone(params.err);
break;
default:
throw new bindings.MojoCodecError("Unexpected message name");
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index 6fa42bc..3649552 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -45,17 +45,13 @@
Stream<mojom.Result> exec(String query) {
StreamController<mojom.Result> sc = new StreamController();
- mojom.ExecStream execStream = new ExecStreamImpl._fromStreamController(sc);
+
+ mojom.ExecStreamStub stub = new mojom.ExecStreamStub.unbound();
+ stub.impl = new ExecStreamImpl._fromStreamController(sc);
// Call dbExec asynchronously.
- _proxy.ptr.dbExec(fullName, query, execStream).then((v) {
- // TODO(nlacasse): Is throwing the correct behavior here? Consider
- // returning a tuple (Stream<mojom.Result>, Future) and resolve the
- // Future at the end of the RPC (with an error if applicable). Then
- // errors will be handled the same in this method as in all the other
- // methods that return Futures. (Even though the other methods seem to
- // "throw", they are actually resolving a Future since the function is
- // declared with "async".)
+ _proxy.ptr.dbExec(fullName, query, stub).then((v) {
+ // TODO(nlacasse): Same question regarding throwing behavior as TableScan.
if (isError(v.err)) throw v.err;
});
@@ -120,16 +116,30 @@
}
}
-class ExecStreamImpl implements mojom.ExecStream {
+class ExecStreamImpl extends Object
+ with StreamFlowControl
+ implements mojom.ExecStream {
final StreamController<mojom.Result> sc;
- ExecStreamImpl._fromStreamController(this.sc);
- onResult(mojom.Result result) {
+ ExecStreamImpl._fromStreamController(this.sc) {
+ initFlowControl(this.sc);
+ }
+
+ onResult(mojom.Result result, [Function newAck = null]) {
+ // NOTE(aghassemi): We need to make newAck optional to match mojo's
+ // define class, but newAck is always provided by mojo when called.
+ if (newAck == null) {
+ throw new ArgumentError('newAck can not be null');
+ }
sc.add(result);
+
+ // Only ack after we get unlocked.
+ // If we are not locked, onNextUnlock returns immediately.
+ return onNextUnlock().then((_) => newAck());
}
// Called by the mojo proxy when the Go function call returns.
- onReturn(mojom.Error err) {
+ onDone(mojom.Error err) {
if (isError(err)) {
sc.addError(err);
}
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index c3f0e92..67d4d61 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -121,7 +121,7 @@
}
// Called by the mojom proxy when the Go function call returns.
- onReturn(mojom.Error err) {
+ onDone(mojom.Error err) {
if (isError(err)) {
sc.addError(err);
}
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index 8d1c30b..96642e9 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -150,5 +150,39 @@
expect(testing.DatabaseWatch.onChangeCounter.count, equals(numOperations));
});
+ // TODO(aghassemi): Currently queries do not work with non-VOM-encoded values.
+ // See issue: https://github.com/vanadium/issues/issues/766
+
+ // test('basic exec', () async {
+ // var app = c.app(utils.uniqueName('app'));
+ // await app.create(utils.emptyPerms());
+ // var db = app.noSqlDatabase(utils.uniqueName('db'));
+ // await db.create(utils.emptyPerms());
+ // var table = db.table('airports');
+ // await table.create(utils.emptyPerms());
+ //
+ // await table.put('sfo', UTF8.encode('San Francisco'));
+ // await table.put('yyz', UTF8.encode('Toronto'));
+ //
+ // var query = 'select k as code, v as cityname from airports';
+ // var resultStream = db.exec(query);
+ //
+ // var results = await resultStream.toList();
+ //
+ // // Expect first entry to be column headers.
+ // var headers = results[0].values;
+ // expect(headers, equals([UTF8.decode('code'), UTF8.decode('cityname')]));
+ //
+ // // Expect the two entries
+ // var entry1 = results[1].values;
+ // expect(entry1, equals([UTF8.decode('sfo'), UTF8.decode('San Francisco')]));
+ //
+ // var entry2 = results[2].values;
+ // expect(entry2, equals([UTF8.decode('yyz'), UTF8.decode('Toronto')]));
+ //
+ // // Expect no more entries than two data rows and one column header.
+ // expect(resultStream.length, 3);
+ // });
+
// TODO(nlacasse): Test database.get/setPermissions.
}
diff --git a/mojom/syncbase.mojom b/mojom/syncbase.mojom
index 5fddb56..e03fade 100644
--- a/mojom/syncbase.mojom
+++ b/mojom/syncbase.mojom
@@ -51,9 +51,12 @@
array<array<uint8>> values;
};
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
interface ExecStream {
- OnResult(Result result);
- OnReturn(Error err);
+ OnResult(Result result) => ();
+ OnDone(Error err);
};
struct KeyValue {
@@ -61,13 +64,12 @@
array<uint8> value;
};
-// ScanStream implements the observer + ack pattern.
-// In this model, server pushes a change object and waits until it receives
-// an ack back from client before sending the next KeyValue, effectively
-// creating a mechanism for flow control.
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
interface ScanStream {
OnKeyValue(KeyValue key_value) => ();
- OnReturn(Error err);
+ OnDone(Error err);
};
struct WatchChange {
@@ -85,10 +87,9 @@
array<uint8> resume_marker;
};
-// WatchGlobStream implements the observer + ack pattern.
-// In this model, server pushes a change object and waits until it receives
-// an ack back from client before sending the next change, effectively creating
-// a mechanism for flow control.
+// In this model, the server pushes one stream item to the client, then waits
+// for an ack from the client before sending the next item, thus providing
+// flow control.
interface WatchGlobStream {
OnChange(WatchChange change) => ();
OnError(Error err);