TBR croupier: Proper Scan => Watch

Earlier, I started up watch changes but I could miss the initial
changes written to the table (e.g., if syncing occurred before
watch could begin).

Now I have a watchEverything function that sets up watch for me
and neatly handles batch sorting and cancellation.

Note: I also fixed a few UI-related bugs and have stopped
invoking this with the proxy.

Closes https://github.com/vanadium/issues/issues/978

Change-Id: Ifb6270363cb9778553f2a0ed9140c85d931d1c4c
diff --git a/Makefile b/Makefile
index 6190ee5..d62cdf5 100644
--- a/Makefile
+++ b/Makefile
@@ -34,8 +34,7 @@
 
 	SYNCBASE_FLAGS += --logtostderr=true \
 		--root-dir=$(APP_HOME_DIR)/syncbase_data \
-		--v23.credentials=$(ANDROID_CREDS_DIR) \
-		--v23.proxy=proxy
+		--v23.credentials=$(ANDROID_CREDS_DIR)
 
 # If this is not the first mojo shell, then you must reuse the dev servers
 # to avoid a "port in use" error.
diff --git a/lib/components/hearts/hearts.part.dart b/lib/components/hearts/hearts.part.dart
index 8cd0689..c4d193e 100644
--- a/lib/components/hearts/hearts.part.dart
+++ b/lib/components/hearts/hearts.part.dart
@@ -92,6 +92,24 @@
       _reset();
     }
 
+    // Set the trickTaking flag on each build.
+    if (!trickTaking) {
+      if (_detectTrick()) {
+        trickTaking = true;
+        _fillMissingPlayedCard();
+        // Unfortunately, ZCards are drawn on the game layer,
+        // so instead of setState, we must use trueSetState.
+        new Future.delayed(const Duration(milliseconds: SHOW_TRICK_DURATION),
+            () {
+          setState(() {
+            trickTaking = false;
+          });
+        });
+      } else {
+        _fillPlayedCards();
+      }
+    }
+
     // Hearts Widget
     Widget heartsWidget = new Container(
         decoration: new BoxDecoration(backgroundColor: Colors.grey[300]),
@@ -433,22 +451,6 @@
     List<Widget> cardCollections = new List<Widget>();
 
     if (_showSplitView) {
-      if (!trickTaking) {
-        if (_detectTrick()) {
-          trickTaking = true;
-          _fillMissingPlayedCard();
-          // Unfortunately, ZCards are drawn on the game layer,
-          // so instead of setState, we must use trueSetState.
-          new Future.delayed(const Duration(milliseconds: SHOW_TRICK_DURATION),
-              () {
-            setState(() {
-              trickTaking = false;
-            });
-          });
-        } else {
-          _fillPlayedCards();
-        }
-      }
       cardCollections.add(new Container(
           decoration:
               new BoxDecoration(backgroundColor: style.theme.primaryColor),
diff --git a/lib/src/syncbase/croupier_client.dart b/lib/src/syncbase/croupier_client.dart
index b3714cf..dd1b57f 100644
--- a/lib/src/syncbase/croupier_client.dart
+++ b/lib/src/syncbase/croupier_client.dart
@@ -2,17 +2,18 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-import '../../settings/client.dart' as settings_client;
-import 'discovery_client.dart' show DiscoveryClient;
-import 'util.dart' as util;
-
 import 'dart:async';
+import 'dart:convert' show UTF8;
 import 'dart:io' show Platform;
 
-import 'package:v23discovery/discovery.dart' as discovery;
 import 'package:flutter/services.dart' show shell;
 import 'package:syncbase/src/naming/util.dart' as naming;
 import 'package:syncbase/syncbase_client.dart' as sc;
+import 'package:v23discovery/discovery.dart' as discovery;
+
+import '../../settings/client.dart' as settings_client;
+import 'discovery_client.dart' show DiscoveryClient;
+import 'util.dart' as util;
 
 class CroupierClient {
   final sc.SyncbaseClient _syncbaseClient;
@@ -75,22 +76,21 @@
     return db;
   }
 
-  Completer _tableLock;
-
-  // TODO(alexfandrianto): Try not to call this twice at the same time.
-  // That would lead to very race-y behavior.
+  // Use Table Lock to sort out multiple calls to createTable.
+  Map<String, Completer<sc.SyncbaseTable>> _tableLock =
+      new Map<String, Completer<sc.SyncbaseTable>>();
   Future<sc.SyncbaseTable> createTable(
       sc.SyncbaseDatabase db, String tableName) async {
-    if (_tableLock != null) {
-      await _tableLock.future;
+    if (_tableLock[tableName] != null) {
+      return _tableLock[tableName].future;
     }
-    _tableLock = new Completer();
-    var table = db.table(tableName);
+    _tableLock[tableName] = new Completer<sc.SyncbaseTable>();
+    sc.SyncbaseTable table = db.table(tableName);
     if (!(await table.exists())) {
       await table.create(util.openPerms);
     }
     util.log('CroupierClient: ${tableName} is ready');
-    _tableLock.complete();
+    _tableLock[tableName].complete(table);
     return table;
   }
 
@@ -152,6 +152,92 @@
     String sgName = naming.join(sgPrefix, suffix);
     return sgName;
   }
+
+  // Creates a watch stream that lets you watch a bunch of updates.
+  // In order to guarantee that you see everything, it also scans the whole
+  // table and sorts the watch updates (if asked).
+  // Finally, the return value is a completer that when completed stops
+  // listening to the watch stream.
+  Future<StreamSubscription<sc.WatchChange>> watchEverything(
+      sc.SyncbaseDatabase db,
+      String tbName,
+      String prefix,
+      util.asyncKeyValueCallback onChange,
+      {Comparator<sc.WatchChange> sorter}) async {
+    util.log('Watching for changes on ${tbName}:${prefix}...');
+
+    // For safety, be certain that the syncbase table at tbName exists.
+    await createTable(db, tbName);
+
+    // In order to safely read all watch updates, we have to do a scan first.
+    // See https://github.com/vanadium/issues/issues/917
+    sc.SyncbaseBatchDatabase sbdb =
+        await db.beginBatch(sc.SyncbaseClient.batchOptions(readOnly: true));
+    List<int> resumeMarker;
+    try {
+      resumeMarker = await sbdb.getResumeMarker();
+
+      sc.SyncbaseTable tb = sbdb.table(tbName);
+
+      await tb
+          .scan(new sc.RowRange.prefix(prefix))
+          .forEach((sc.KeyValue kv) async {
+        String key = kv.key;
+        String value = UTF8.decode(kv.value);
+        print("Scan found ${key}, ${value}");
+        await onChange(key, value, true);
+      });
+    } finally {
+      await sbdb.abort();
+    }
+
+    // Now we can start watching from the batch's resume marker.
+    Stream<sc.WatchChange> watchStream = db.watch(tbName, prefix, resumeMarker);
+
+    // This list will queue up the changes of a watch batch sequence.
+    List<sc.WatchChange> watchSequence = new List<sc.WatchChange>();
+
+    // Define a change handler that will be applied whenever a WatchChange
+    // arrives on the watchStream.
+    _handleChange(sc.WatchChange wc) async {
+      // Accumulate the WatchChange's in watchSequence.
+      watchSequence.add(wc);
+      if (wc.continued) {
+        // Since there are more WatchChange's to collect, do not act yet.
+        return;
+      } else {
+        // 1. Possibly sort the watchSequence by the optional sorter function.
+        if (sorter != null) {
+          watchSequence.sort(sorter);
+        }
+
+        // 2. Then run through each value in order.
+        watchSequence.forEach((sc.WatchChange _w) async {
+          String key = _w.rowKey;
+          String value;
+          switch (_w.changeType) {
+            case sc.WatchChangeTypes.put:
+              value = UTF8.decode(_w.valueBytes);
+              break;
+            case sc.WatchChangeTypes.delete:
+              value = null;
+              break;
+            default:
+              assert(false);
+          }
+
+          print("Watch found ${key}, ${value}");
+          await onChange(key, value, false);
+        });
+
+        // 3. Then clear the watchSequence.
+        watchSequence.clear();
+      }
+    }
+
+    // Start listening on the watch stream using the change handler.
+    return watchStream.listen((sc.WatchChange wc) => _handleChange(wc));
+  }
 }
 
 // Example implementation of a ScanHandler.
diff --git a/lib/src/syncbase/log_writer.dart b/lib/src/syncbase/log_writer.dart
index bfb278e..a8ea950 100644
--- a/lib/src/syncbase/log_writer.dart
+++ b/lib/src/syncbase/log_writer.dart
@@ -41,9 +41,9 @@
   // Affects read/write/watch locations of the log writer.
   String logPrefix = ''; // This is usually set to <game_id>/log
 
-  // An internal boolean that should be set to true when watching and reset to
-  // false once watch should be turned off.
-  bool _watching = false;
+  // An internal StreamSubscription that should be canceled once we are done
+  // watching on this log.
+  StreamSubscription<WatchChange> _watchSubscription;
 
   // When a proposal is made (or received), this flag is set to true.
   // Once a consensus has been reached, this is set to false again.
@@ -81,74 +81,18 @@
     tb = await _cc.createTable(db, tbName);
 
     // Start to watch the stream.
-    Stream<WatchChange> watchStream =
-        db.watch(tbName, this.logPrefix, UTF8.encode("now"));
-    _startWatch(watchStream); // Don't wait for this future.
+    this._watchSubscription = await _cc.watchEverything(
+        db, tbName, this.logPrefix, _onChange,
+        sorter: (WatchChange a, WatchChange b) {
+      return a.rowKey.compareTo(b.rowKey);
+    });
   }
 
-  Future _startWatch(Stream<WatchChange> watchStream) async {
-    util.log('watching for changes...');
-    _watching = true;
-
-    List<WatchChange> watchSequence = new List<WatchChange>();
-
-    // This stream never really ends, so it will watch forever.
-    // https://github.com/vanadium/issues/issues/833
-    // To break out of the watch handler, we can use the _watching flag.
-    // However, the for loop will only break on the watch update after _watching
-    // is set to false.
-    await for (WatchChange wc in watchStream) {
-      if (!this._watching) {
-        break;
-      }
-
-      // Accumulate the WatchChange's in watchSequence.
-      watchSequence.add(wc);
-      if (wc.continued) {
-        // Since there are more WatchChange's to collect, do not act yet.
-        continue;
-      } else {
-        // 1. Sort the watchSequence by timestamp.
-        // Note: The rowKeys for the logPrefix can be sorted this way.
-        // It should not matter if proposals gets mixed up with writes.
-        watchSequence.sort((WatchChange a, WatchChange b) {
-          return a.rowKey.compareTo(b.rowKey);
-        });
-
-        // 2. Then run through each value in order.
-        watchSequence.forEach((WatchChange w) {
-          _handleWatchChange(w);
-        });
-
-        // 3. Then clear the watchSequence.
-        watchSequence.clear();
-      }
-    }
-  }
-
-  Future _handleWatchChange(WatchChange wc) async {
-    assert(wc.tableName == tbName);
-    util.log('Watch Key: ${wc.rowKey}');
-    util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
-    String key = wc.rowKey.replaceFirst("${this.logPrefix}/", "");
-    if (key == wc.rowKey) {
-      print("Lacks prefix '${this.logPrefix}/', skipping...");
-      return;
-    }
-    String value;
-    switch (wc.changeType) {
-      case WatchChangeTypes.put:
-        value = UTF8.decode(wc.valueBytes);
-        break;
-      case WatchChangeTypes.delete:
-        value = null;
-        break;
-      default:
-        assert(false);
-    }
+  Future _onChange(String rowKey, String value, bool duringScan) async {
+    String key = rowKey.replaceFirst("${this.logPrefix}/", "");
 
     if (_isProposalKey(key)) {
-      if (value != null && !_acceptedProposals.contains(key)) {
+      if (value != null && !_acceptedProposals.contains(key) && !duringScan) {
         await _receiveProposal(key, value);
       }
     } else {
@@ -158,7 +102,10 @@
   }
 
   void close() {
-    this._watching = false;
+    if (this._watchSubscription != null) {
+      this._watchSubscription.cancel();
+      this._watchSubscription = null;
+    }
   }
 
   Future write(SimulLevel s, String value) async {
diff --git a/lib/src/syncbase/settings_manager.dart b/lib/src/syncbase/settings_manager.dart
index 32f63a0..55a71b8 100644
--- a/lib/src/syncbase/settings_manager.dart
+++ b/lib/src/syncbase/settings_manager.dart
@@ -54,10 +54,14 @@
     tb = await _cc.createTable(db, util.tableNameSettings);
 
     // Start to watch the stream for the shared settings table.
-    Stream<sc.WatchChange> watchStream = db.watch(util.tableNameSettings,
-        util.settingsWatchSyncPrefix, UTF8.encode("now"));
-    _startWatchSettings(watchStream); // Don't wait for this future.
-    _loadSettings(tb); // Don't wait for this future.
+    await _cc.watchEverything(db, util.tableNameSettings,
+        util.settingsWatchSyncPrefix, _onSettingsChange);
+  }
+
+  // In the case of the settings manager, we're checking for any changes to
+  // any person's Croupier Settings.
+  Future _onSettingsChange(String key, String value, bool duringScan) async {
+    this.updateSettingsCallback(util.userIDFromSettingsDataKey(key), value);
   }
 
   // Guaranteed to be called when the program starts.
@@ -99,35 +103,6 @@
         .put(UTF8.encode(jsonString));
   }
 
-  // This watch method ensures that any changes are propagated to the caller.
-  // In the case of the settings manager, we're checking for any changes to
-  // any person's Croupier Settings.
-  Future _startWatchSettings(Stream<sc.WatchChange> watchStream) async {
-    util.log('Settings watching for changes...');
-    // This stream never really ends, so I guess we'll watch forever.
-    await for (sc.WatchChange wc in watchStream) {
-      assert(wc.tableName == util.tableNameSettings);
-      util.log('Watch Key: ${wc.rowKey}');
-      util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
-      String key = wc.rowKey;
-      String value;
-      switch (wc.changeType) {
-        case sc.WatchChangeTypes.put:
-          value = UTF8.decode(wc.valueBytes);
-          break;
-        case sc.WatchChangeTypes.delete:
-          value = null;
-          break;
-        default:
-          assert(false);
-      }
-
-      if (this.updateSettingsCallback != null) {
-        this.updateSettingsCallback(util.userIDFromSettingsDataKey(key), value);
-      }
-    }
-  }
-
   // Best called after load(), to ensure that there are settings in the table.
   Future createSettingsSyncgroup() async {
     int id = await _getUserID();
@@ -141,53 +116,32 @@
     return _cc.makeSyncgroupName(await _syncSettingsSuffix());
   }
 
-  // This watch method ensures that any changes are propagated to the caller.
-  // In this case, we're forwarding any player changes to the Croupier logic.
-  // We also catch the game status signals.
-  Future _startWatchGame(Stream<sc.WatchChange> watchStream) async {
-    util.log('Game watching for changes...');
-    // This stream never really ends, so I guess we'll watch forever.
-    await for (sc.WatchChange wc in watchStream) {
-      assert(wc.tableName == util.tableNameGames);
-      util.log('Watch Key: ${wc.rowKey}');
-      util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
-      String key = wc.rowKey;
-      String value;
-      switch (wc.changeType) {
-        case sc.WatchChangeTypes.put:
-          value = UTF8.decode(wc.valueBytes);
-          break;
-        case sc.WatchChangeTypes.delete:
-          value = null;
-          break;
-        default:
-          assert(false);
+  // Forward any player changes and game status signals to Croupier's logic.
+  // TODO(alexfandrianto): This also watches the log (but doesn't process it.
+  Future _onGameChange(String key, String value, bool duringScan) async {
+    if (key.indexOf("/players") != -1) {
+      if (this.updatePlayerFoundCallback != null) {
+        String type = util.playerUpdateTypeFromPlayerKey(key);
+        switch (type) {
+          case "player_number":
+            // Update the player number for this player.
+            this.updatePlayerFoundCallback(key, value);
+            break;
+          case "settings_sg":
+            // Join this player's settings syncgroup.
+            _cc.joinSyncgroup(value);
+
+            // Also, signal that this player has been found.
+            this.updatePlayerFoundCallback(key, null);
+            break;
+          default:
+            print("Unexpected key: ${key} with value ${value}");
+            assert(false);
+        }
       }
-
-      if (key.indexOf("/players") != -1) {
-        if (this.updatePlayerFoundCallback != null) {
-          String type = util.playerUpdateTypeFromPlayerKey(key);
-          switch (type) {
-            case "player_number":
-              // Update the player number for this player.
-              this.updatePlayerFoundCallback(key, value);
-              break;
-            case "settings_sg":
-              // Join this player's settings syncgroup.
-              _cc.joinSyncgroup(value);
-
-              // Also, signal that this player has been found.
-              this.updatePlayerFoundCallback(key, null);
-              break;
-            default:
-              print("Unexpected key: ${key} with value ${value}");
-              assert(false);
-          }
-        }
-      } else if (key.indexOf("/status") != -1) {
-        if (this.updateGameStatusCallback != null) {
-          this.updateGameStatusCallback(key, value);
-        }
+    } else if (key.indexOf("/status") != -1) {
+      if (this.updateGameStatusCallback != null) {
+        this.updateGameStatusCallback(key, value);
       }
     }
   }
@@ -199,9 +153,8 @@
     sc.SyncbaseTable gameTable = await _cc.createTable(db, util.tableNameGames);
 
     // Watch for the players in the game.
-    Stream<sc.WatchChange> watchStream = db.watch(
-        util.tableNameGames, util.syncgamePrefix(gameID), UTF8.encode("now"));
-    _startWatchGame(watchStream); // Don't wait for this future.
+    await _cc.watchEverything(
+        db, util.tableNameGames, util.syncgamePrefix(gameID), _onGameChange);
 
     print("Now writing to some rows of ${gameID}");
     // Start up the table and write yourself as player 0.
@@ -231,9 +184,8 @@
     sc.SyncbaseTable gameTable = await _cc.createTable(db, util.tableNameGames);
 
     // Watch for the players in the game.
-    Stream<sc.WatchChange> watchStream = db.watch(
-        util.tableNameGames, util.syncgamePrefix(gameID), UTF8.encode("now"));
-    _startWatchGame(watchStream); // Don't wait for this future.
+    await _cc.watchEverything(
+        db, util.tableNameGames, util.syncgamePrefix(gameID), _onGameChange);
 
     await _cc.joinSyncgroup(sgName);
 
@@ -260,20 +212,6 @@
     await gameTable.row(util.gameStatusKey(gameID)).put(UTF8.encode(status));
   }
 
-  // When starting the settings manager, there may be settings already in the
-  // store. Make sure to load those.
-  Future _loadSettings(sc.SyncbaseTable tb) async {
-    tb
-        .scan(new sc.RowRange.prefix(util.settingsWatchSyncPrefix))
-        .forEach((sc.KeyValue kv) {
-      if (util.isSettingsKey(kv.key)) {
-        // Then we can process the value as if it were settings data.
-        this.updateSettingsCallback(
-            util.userIDFromSettingsDataKey(kv.key), UTF8.decode(kv.value));
-      }
-    });
-  }
-
   // TODO(alexfandrianto): It is possible that the more efficient way of
   // scanning is to do it for only short bursts. In that case, we should call
   // stopScanSettings a few seconds after starting it.
diff --git a/lib/src/syncbase/util.dart b/lib/src/syncbase/util.dart
index 9523947..af882ba 100644
--- a/lib/src/syncbase/util.dart
+++ b/lib/src/syncbase/util.dart
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+import 'dart:async';
 import 'package:syncbase/syncbase_client.dart' show Perms, SyncbaseClient;
 
 const String appName = 'app';
@@ -26,6 +27,7 @@
 
 typedef void NoArgCb();
 typedef void keyValueCallback(String key, String value);
+typedef Future asyncKeyValueCallback(String key, String value, bool duringScan);
 
 const String openPermsJson =
     '{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}';
diff --git a/shortcut_template b/shortcut_template
index a27b681..5cd8e15 100644
--- a/shortcut_template
+++ b/shortcut_template
@@ -4,7 +4,7 @@
 --verbose
 --args-for=mojo:flutter --enable-checked-mode
 --enable-multiprocess
---map-origin=https://mojo2.v.io=https://storage.googleapis.com/mojo_services/v23discovery/android/
+--map-origin=https://mojo2.v.io=https://storage.googleapis.com/mojo_services/v23discovery/mojo_services/android/
 --args-for=https://mojo2.v.io/discovery.mojo host%DEVICE_ID% mdns
---map-origin=https://mojo.v.io=https://storage.googleapis.com/mojo_services/syncbase/android/
---args-for=https://mojo.v.io/syncbase_server.mojo --v=0 --logtostderr=true --root-dir=/data/data/org.chromium.mojo.shell/app_home/syncbase_data --v23.credentials=/sdcard/v23creds --v23.proxy=proxy %SYNCBASE_NAME_FLAG%
+--map-origin=https://mojo.v.io=https://storage.googleapis.com/mojo_services/syncbase/mojo_services/android/
+--args-for=https://mojo.v.io/syncbase_server.mojo --v=0 --logtostderr=true --root-dir=/data/data/org.chromium.mojo.shell/app_home/syncbase_data --v23.credentials=/sdcard/v23creds %SYNCBASE_NAME_FLAG%