TBR croupier: Proper Scan => Watch
Earlier, I started up watch changes but I could miss the initial
changes written to the table (e.g., if syncing occurred before
watch could begin).
Now I have a watchEverything function that sets up watch for me
and neatly handles batch sorting and cancellation.
Note: I also fixed a few UI-related bugs and have stopped
invoking this with the proxy.
Closes https://github.com/vanadium/issues/issues/978
Change-Id: Ifb6270363cb9778553f2a0ed9140c85d931d1c4c
diff --git a/Makefile b/Makefile
index 6190ee5..d62cdf5 100644
--- a/Makefile
+++ b/Makefile
@@ -34,8 +34,7 @@
SYNCBASE_FLAGS += --logtostderr=true \
--root-dir=$(APP_HOME_DIR)/syncbase_data \
- --v23.credentials=$(ANDROID_CREDS_DIR) \
- --v23.proxy=proxy
+ --v23.credentials=$(ANDROID_CREDS_DIR)
# If this is not the first mojo shell, then you must reuse the dev servers
# to avoid a "port in use" error.
diff --git a/lib/components/hearts/hearts.part.dart b/lib/components/hearts/hearts.part.dart
index 8cd0689..c4d193e 100644
--- a/lib/components/hearts/hearts.part.dart
+++ b/lib/components/hearts/hearts.part.dart
@@ -92,6 +92,24 @@
_reset();
}
+ // Set the trickTaking flag on each build.
+ if (!trickTaking) {
+ if (_detectTrick()) {
+ trickTaking = true;
+ _fillMissingPlayedCard();
+ // Unfortunately, ZCards are drawn on the game layer,
+ // so instead of setState, we must use trueSetState.
+ new Future.delayed(const Duration(milliseconds: SHOW_TRICK_DURATION),
+ () {
+ setState(() {
+ trickTaking = false;
+ });
+ });
+ } else {
+ _fillPlayedCards();
+ }
+ }
+
// Hearts Widget
Widget heartsWidget = new Container(
decoration: new BoxDecoration(backgroundColor: Colors.grey[300]),
@@ -433,22 +451,6 @@
List<Widget> cardCollections = new List<Widget>();
if (_showSplitView) {
- if (!trickTaking) {
- if (_detectTrick()) {
- trickTaking = true;
- _fillMissingPlayedCard();
- // Unfortunately, ZCards are drawn on the game layer,
- // so instead of setState, we must use trueSetState.
- new Future.delayed(const Duration(milliseconds: SHOW_TRICK_DURATION),
- () {
- setState(() {
- trickTaking = false;
- });
- });
- } else {
- _fillPlayedCards();
- }
- }
cardCollections.add(new Container(
decoration:
new BoxDecoration(backgroundColor: style.theme.primaryColor),
diff --git a/lib/src/syncbase/croupier_client.dart b/lib/src/syncbase/croupier_client.dart
index b3714cf..dd1b57f 100644
--- a/lib/src/syncbase/croupier_client.dart
+++ b/lib/src/syncbase/croupier_client.dart
@@ -2,17 +2,18 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-import '../../settings/client.dart' as settings_client;
-import 'discovery_client.dart' show DiscoveryClient;
-import 'util.dart' as util;
-
import 'dart:async';
+import 'dart:convert' show UTF8;
import 'dart:io' show Platform;
-import 'package:v23discovery/discovery.dart' as discovery;
import 'package:flutter/services.dart' show shell;
import 'package:syncbase/src/naming/util.dart' as naming;
import 'package:syncbase/syncbase_client.dart' as sc;
+import 'package:v23discovery/discovery.dart' as discovery;
+
+import '../../settings/client.dart' as settings_client;
+import 'discovery_client.dart' show DiscoveryClient;
+import 'util.dart' as util;
class CroupierClient {
final sc.SyncbaseClient _syncbaseClient;
@@ -75,22 +76,21 @@
return db;
}
- Completer _tableLock;
-
- // TODO(alexfandrianto): Try not to call this twice at the same time.
- // That would lead to very race-y behavior.
+ // Use Table Lock to sort out multiple calls to createTable.
+ Map<String, Completer<sc.SyncbaseTable>> _tableLock =
+ new Map<String, Completer<sc.SyncbaseTable>>();
Future<sc.SyncbaseTable> createTable(
sc.SyncbaseDatabase db, String tableName) async {
- if (_tableLock != null) {
- await _tableLock.future;
+ if (_tableLock[tableName] != null) {
+ return _tableLock[tableName].future;
}
- _tableLock = new Completer();
- var table = db.table(tableName);
+ _tableLock[tableName] = new Completer<sc.SyncbaseTable>();
+ sc.SyncbaseTable table = db.table(tableName);
if (!(await table.exists())) {
await table.create(util.openPerms);
}
util.log('CroupierClient: ${tableName} is ready');
- _tableLock.complete();
+ _tableLock[tableName].complete(table);
return table;
}
@@ -152,6 +152,92 @@
String sgName = naming.join(sgPrefix, suffix);
return sgName;
}
+
+ // Creates a watch stream that lets you watch a bunch of updates.
+ // In order to guarantee that you see everything, it also scans the whole
+ // table and sorts the watch updates (if asked).
+ // Finally, the return value is a completer that when completed stops
+ // listening to the watch stream.
+ Future<StreamSubscription<sc.WatchChange>> watchEverything(
+ sc.SyncbaseDatabase db,
+ String tbName,
+ String prefix,
+ util.asyncKeyValueCallback onChange,
+ {Comparator<sc.WatchChange> sorter}) async {
+ util.log('Watching for changes on ${tbName}:${prefix}...');
+
+ // For safety, be certain that the syncbase table at tbName exists.
+ await createTable(db, tbName);
+
+ // In order to safely read all watch updates, we have to do a scan first.
+ // See https://github.com/vanadium/issues/issues/917
+ sc.SyncbaseBatchDatabase sbdb =
+ await db.beginBatch(sc.SyncbaseClient.batchOptions(readOnly: true));
+ List<int> resumeMarker;
+ try {
+ resumeMarker = await sbdb.getResumeMarker();
+
+ sc.SyncbaseTable tb = sbdb.table(tbName);
+
+ await tb
+ .scan(new sc.RowRange.prefix(prefix))
+ .forEach((sc.KeyValue kv) async {
+ String key = kv.key;
+ String value = UTF8.decode(kv.value);
+ print("Scan found ${key}, ${value}");
+ await onChange(key, value, true);
+ });
+ } finally {
+ await sbdb.abort();
+ }
+
+ // Now we can start watching from the batch's resume marker.
+ Stream<sc.WatchChange> watchStream = db.watch(tbName, prefix, resumeMarker);
+
+ // This list will queue up the changes of a watch batch sequence.
+ List<sc.WatchChange> watchSequence = new List<sc.WatchChange>();
+
+ // Define a change handler that will be applied whenever a WatchChange
+ // arrives on the watchStream.
+ _handleChange(sc.WatchChange wc) async {
+ // Accumulate the WatchChange's in watchSequence.
+ watchSequence.add(wc);
+ if (wc.continued) {
+ // Since there are more WatchChange's to collect, do not act yet.
+ return;
+ } else {
+ // 1. Possibly sort the watchSequence by the optional sorter function.
+ if (sorter != null) {
+ watchSequence.sort(sorter);
+ }
+
+ // 2. Then run through each value in order.
+ watchSequence.forEach((sc.WatchChange _w) async {
+ String key = _w.rowKey;
+ String value;
+ switch (_w.changeType) {
+ case sc.WatchChangeTypes.put:
+ value = UTF8.decode(_w.valueBytes);
+ break;
+ case sc.WatchChangeTypes.delete:
+ value = null;
+ break;
+ default:
+ assert(false);
+ }
+
+ print("Watch found ${key}, ${value}");
+ await onChange(key, value, false);
+ });
+
+ // 3. Then clear the watchSequence.
+ watchSequence.clear();
+ }
+ }
+
+ // Start listening on the watch stream using the change handler.
+ return watchStream.listen((sc.WatchChange wc) => _handleChange(wc));
+ }
}
// Example implementation of a ScanHandler.
diff --git a/lib/src/syncbase/log_writer.dart b/lib/src/syncbase/log_writer.dart
index bfb278e..a8ea950 100644
--- a/lib/src/syncbase/log_writer.dart
+++ b/lib/src/syncbase/log_writer.dart
@@ -41,9 +41,9 @@
// Affects read/write/watch locations of the log writer.
String logPrefix = ''; // This is usually set to <game_id>/log
- // An internal boolean that should be set to true when watching and reset to
- // false once watch should be turned off.
- bool _watching = false;
+ // An internal StreamSubscription that should be canceled once we are done
+ // watching on this log.
+ StreamSubscription<WatchChange> _watchSubscription;
// When a proposal is made (or received), this flag is set to true.
// Once a consensus has been reached, this is set to false again.
@@ -81,74 +81,18 @@
tb = await _cc.createTable(db, tbName);
// Start to watch the stream.
- Stream<WatchChange> watchStream =
- db.watch(tbName, this.logPrefix, UTF8.encode("now"));
- _startWatch(watchStream); // Don't wait for this future.
+ this._watchSubscription = await _cc.watchEverything(
+ db, tbName, this.logPrefix, _onChange,
+ sorter: (WatchChange a, WatchChange b) {
+ return a.rowKey.compareTo(b.rowKey);
+ });
}
- Future _startWatch(Stream<WatchChange> watchStream) async {
- util.log('watching for changes...');
- _watching = true;
-
- List<WatchChange> watchSequence = new List<WatchChange>();
-
- // This stream never really ends, so it will watch forever.
- // https://github.com/vanadium/issues/issues/833
- // To break out of the watch handler, we can use the _watching flag.
- // However, the for loop will only break on the watch update after _watching
- // is set to false.
- await for (WatchChange wc in watchStream) {
- if (!this._watching) {
- break;
- }
-
- // Accumulate the WatchChange's in watchSequence.
- watchSequence.add(wc);
- if (wc.continued) {
- // Since there are more WatchChange's to collect, do not act yet.
- continue;
- } else {
- // 1. Sort the watchSequence by timestamp.
- // Note: The rowKeys for the logPrefix can be sorted this way.
- // It should not matter if proposals gets mixed up with writes.
- watchSequence.sort((WatchChange a, WatchChange b) {
- return a.rowKey.compareTo(b.rowKey);
- });
-
- // 2. Then run through each value in order.
- watchSequence.forEach((WatchChange w) {
- _handleWatchChange(w);
- });
-
- // 3. Then clear the watchSequence.
- watchSequence.clear();
- }
- }
- }
-
- Future _handleWatchChange(WatchChange wc) async {
- assert(wc.tableName == tbName);
- util.log('Watch Key: ${wc.rowKey}');
- util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
- String key = wc.rowKey.replaceFirst("${this.logPrefix}/", "");
- if (key == wc.rowKey) {
- print("Lacks prefix '${this.logPrefix}/', skipping...");
- return;
- }
- String value;
- switch (wc.changeType) {
- case WatchChangeTypes.put:
- value = UTF8.decode(wc.valueBytes);
- break;
- case WatchChangeTypes.delete:
- value = null;
- break;
- default:
- assert(false);
- }
+ Future _onChange(String rowKey, String value, bool duringScan) async {
+ String key = rowKey.replaceFirst("${this.logPrefix}/", "");
if (_isProposalKey(key)) {
- if (value != null && !_acceptedProposals.contains(key)) {
+ if (value != null && !_acceptedProposals.contains(key) && !duringScan) {
await _receiveProposal(key, value);
}
} else {
@@ -158,7 +102,10 @@
}
void close() {
- this._watching = false;
+ if (this._watchSubscription != null) {
+ this._watchSubscription.cancel();
+ this._watchSubscription = null;
+ }
}
Future write(SimulLevel s, String value) async {
diff --git a/lib/src/syncbase/settings_manager.dart b/lib/src/syncbase/settings_manager.dart
index 32f63a0..55a71b8 100644
--- a/lib/src/syncbase/settings_manager.dart
+++ b/lib/src/syncbase/settings_manager.dart
@@ -54,10 +54,14 @@
tb = await _cc.createTable(db, util.tableNameSettings);
// Start to watch the stream for the shared settings table.
- Stream<sc.WatchChange> watchStream = db.watch(util.tableNameSettings,
- util.settingsWatchSyncPrefix, UTF8.encode("now"));
- _startWatchSettings(watchStream); // Don't wait for this future.
- _loadSettings(tb); // Don't wait for this future.
+ await _cc.watchEverything(db, util.tableNameSettings,
+ util.settingsWatchSyncPrefix, _onSettingsChange);
+ }
+
+ // In the case of the settings manager, we're checking for any changes to
+ // any person's Croupier Settings.
+ Future _onSettingsChange(String key, String value, bool duringScan) async {
+ this.updateSettingsCallback(util.userIDFromSettingsDataKey(key), value);
}
// Guaranteed to be called when the program starts.
@@ -99,35 +103,6 @@
.put(UTF8.encode(jsonString));
}
- // This watch method ensures that any changes are propagated to the caller.
- // In the case of the settings manager, we're checking for any changes to
- // any person's Croupier Settings.
- Future _startWatchSettings(Stream<sc.WatchChange> watchStream) async {
- util.log('Settings watching for changes...');
- // This stream never really ends, so I guess we'll watch forever.
- await for (sc.WatchChange wc in watchStream) {
- assert(wc.tableName == util.tableNameSettings);
- util.log('Watch Key: ${wc.rowKey}');
- util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
- String key = wc.rowKey;
- String value;
- switch (wc.changeType) {
- case sc.WatchChangeTypes.put:
- value = UTF8.decode(wc.valueBytes);
- break;
- case sc.WatchChangeTypes.delete:
- value = null;
- break;
- default:
- assert(false);
- }
-
- if (this.updateSettingsCallback != null) {
- this.updateSettingsCallback(util.userIDFromSettingsDataKey(key), value);
- }
- }
- }
-
// Best called after load(), to ensure that there are settings in the table.
Future createSettingsSyncgroup() async {
int id = await _getUserID();
@@ -141,53 +116,32 @@
return _cc.makeSyncgroupName(await _syncSettingsSuffix());
}
- // This watch method ensures that any changes are propagated to the caller.
- // In this case, we're forwarding any player changes to the Croupier logic.
- // We also catch the game status signals.
- Future _startWatchGame(Stream<sc.WatchChange> watchStream) async {
- util.log('Game watching for changes...');
- // This stream never really ends, so I guess we'll watch forever.
- await for (sc.WatchChange wc in watchStream) {
- assert(wc.tableName == util.tableNameGames);
- util.log('Watch Key: ${wc.rowKey}');
- util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
- String key = wc.rowKey;
- String value;
- switch (wc.changeType) {
- case sc.WatchChangeTypes.put:
- value = UTF8.decode(wc.valueBytes);
- break;
- case sc.WatchChangeTypes.delete:
- value = null;
- break;
- default:
- assert(false);
+ // Forward any player changes and game status signals to Croupier's logic.
+ // TODO(alexfandrianto): This also watches the log (but doesn't process it.
+ Future _onGameChange(String key, String value, bool duringScan) async {
+ if (key.indexOf("/players") != -1) {
+ if (this.updatePlayerFoundCallback != null) {
+ String type = util.playerUpdateTypeFromPlayerKey(key);
+ switch (type) {
+ case "player_number":
+ // Update the player number for this player.
+ this.updatePlayerFoundCallback(key, value);
+ break;
+ case "settings_sg":
+ // Join this player's settings syncgroup.
+ _cc.joinSyncgroup(value);
+
+ // Also, signal that this player has been found.
+ this.updatePlayerFoundCallback(key, null);
+ break;
+ default:
+ print("Unexpected key: ${key} with value ${value}");
+ assert(false);
+ }
}
-
- if (key.indexOf("/players") != -1) {
- if (this.updatePlayerFoundCallback != null) {
- String type = util.playerUpdateTypeFromPlayerKey(key);
- switch (type) {
- case "player_number":
- // Update the player number for this player.
- this.updatePlayerFoundCallback(key, value);
- break;
- case "settings_sg":
- // Join this player's settings syncgroup.
- _cc.joinSyncgroup(value);
-
- // Also, signal that this player has been found.
- this.updatePlayerFoundCallback(key, null);
- break;
- default:
- print("Unexpected key: ${key} with value ${value}");
- assert(false);
- }
- }
- } else if (key.indexOf("/status") != -1) {
- if (this.updateGameStatusCallback != null) {
- this.updateGameStatusCallback(key, value);
- }
+ } else if (key.indexOf("/status") != -1) {
+ if (this.updateGameStatusCallback != null) {
+ this.updateGameStatusCallback(key, value);
}
}
}
@@ -199,9 +153,8 @@
sc.SyncbaseTable gameTable = await _cc.createTable(db, util.tableNameGames);
// Watch for the players in the game.
- Stream<sc.WatchChange> watchStream = db.watch(
- util.tableNameGames, util.syncgamePrefix(gameID), UTF8.encode("now"));
- _startWatchGame(watchStream); // Don't wait for this future.
+ await _cc.watchEverything(
+ db, util.tableNameGames, util.syncgamePrefix(gameID), _onGameChange);
print("Now writing to some rows of ${gameID}");
// Start up the table and write yourself as player 0.
@@ -231,9 +184,8 @@
sc.SyncbaseTable gameTable = await _cc.createTable(db, util.tableNameGames);
// Watch for the players in the game.
- Stream<sc.WatchChange> watchStream = db.watch(
- util.tableNameGames, util.syncgamePrefix(gameID), UTF8.encode("now"));
- _startWatchGame(watchStream); // Don't wait for this future.
+ await _cc.watchEverything(
+ db, util.tableNameGames, util.syncgamePrefix(gameID), _onGameChange);
await _cc.joinSyncgroup(sgName);
@@ -260,20 +212,6 @@
await gameTable.row(util.gameStatusKey(gameID)).put(UTF8.encode(status));
}
- // When starting the settings manager, there may be settings already in the
- // store. Make sure to load those.
- Future _loadSettings(sc.SyncbaseTable tb) async {
- tb
- .scan(new sc.RowRange.prefix(util.settingsWatchSyncPrefix))
- .forEach((sc.KeyValue kv) {
- if (util.isSettingsKey(kv.key)) {
- // Then we can process the value as if it were settings data.
- this.updateSettingsCallback(
- util.userIDFromSettingsDataKey(kv.key), UTF8.decode(kv.value));
- }
- });
- }
-
// TODO(alexfandrianto): It is possible that the more efficient way of
// scanning is to do it for only short bursts. In that case, we should call
// stopScanSettings a few seconds after starting it.
diff --git a/lib/src/syncbase/util.dart b/lib/src/syncbase/util.dart
index 9523947..af882ba 100644
--- a/lib/src/syncbase/util.dart
+++ b/lib/src/syncbase/util.dart
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+import 'dart:async';
import 'package:syncbase/syncbase_client.dart' show Perms, SyncbaseClient;
const String appName = 'app';
@@ -26,6 +27,7 @@
typedef void NoArgCb();
typedef void keyValueCallback(String key, String value);
+typedef Future asyncKeyValueCallback(String key, String value, bool duringScan);
const String openPermsJson =
'{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}';
diff --git a/shortcut_template b/shortcut_template
index a27b681..5cd8e15 100644
--- a/shortcut_template
+++ b/shortcut_template
@@ -4,7 +4,7 @@
--verbose
--args-for=mojo:flutter --enable-checked-mode
--enable-multiprocess
---map-origin=https://mojo2.v.io=https://storage.googleapis.com/mojo_services/v23discovery/android/
+--map-origin=https://mojo2.v.io=https://storage.googleapis.com/mojo_services/v23discovery/mojo_services/android/
--args-for=https://mojo2.v.io/discovery.mojo host%DEVICE_ID% mdns
---map-origin=https://mojo.v.io=https://storage.googleapis.com/mojo_services/syncbase/android/
---args-for=https://mojo.v.io/syncbase_server.mojo --v=0 --logtostderr=true --root-dir=/data/data/org.chromium.mojo.shell/app_home/syncbase_data --v23.credentials=/sdcard/v23creds --v23.proxy=proxy %SYNCBASE_NAME_FLAG%
+--map-origin=https://mojo.v.io=https://storage.googleapis.com/mojo_services/syncbase/mojo_services/android/
+--args-for=https://mojo.v.io/syncbase_server.mojo --v=0 --logtostderr=true --root-dir=/data/data/org.chromium.mojo.shell/app_home/syncbase_data --v23.credentials=/sdcard/v23creds %SYNCBASE_NAME_FLAG%