croupier: Sort Watch Updates to LogWriter

Only the LogWriter needs this since it cares heavily about the order
in which things are updated.

Tested by playing Solitaire with another peer and making many quick
moves. Originally, this would eventually lead to a failure (due to
not sorting the watch updates). But now, it plays just fine.

Closes https://github.com/vanadium/issues/issues/844

Change-Id: If7660bbfbe7d7992f90641afcf497d55bdbbff8e
diff --git a/lib/src/syncbase/log_writer.dart b/lib/src/syncbase/log_writer.dart
index 90fedc3..a588731 100644
--- a/lib/src/syncbase/log_writer.dart
+++ b/lib/src/syncbase/log_writer.dart
@@ -89,6 +89,8 @@
     util.log('watching for changes...');
     _watching = true;
 
+    List<WatchChange> watchSequence = new List<WatchChange>();
+
     // This stream never really ends, so it will watch forever.
     // https://github.com/vanadium/issues/issues/833
     // To break out of the watch handler, we can use the _watching flag.
@@ -99,37 +101,61 @@
         break;
       }
 
-      assert(wc.tableName == tbName);
-      util.log('Watch Key: ${wc.rowKey}');
-      util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
-      String key = wc.rowKey.replaceFirst("${this.logPrefix}/", "");
-      if (key == wc.rowKey) {
-        print("Lacks prefix '${this.logPrefix}/', skipping...");
+      // Accumulate the WatchChange's in watchSequence.
+      watchSequence.add(wc);
+      if (wc.continued) {
+        // Since there are more WatchChange's to collect, do not act yet.
         continue;
-      }
-      String value;
-      switch (wc.changeType) {
-        case WatchChangeTypes.put:
-          value = UTF8.decode(wc.valueBytes);
-          break;
-        case WatchChangeTypes.delete:
-          value = null;
-          break;
-        default:
-          assert(false);
-      }
-
-      if (_isProposalKey(key)) {
-        if (value != null && !_acceptedProposals.contains(key)) {
-          await _receiveProposal(key, value);
-        }
       } else {
-        print("Update callback: ${key}, ${value}");
-        this.updateCallback(key, value);
+        // 1. Sort the watchSequence by timestamp.
+        // Note: The rowKeys for the logPrefix can be sorted this way.
+        // It should not matter if proposals gets mixed up with writes.
+        watchSequence.sort((WatchChange a, WatchChange b) {
+          return a.rowKey.compareTo(b.rowKey);
+        });
+
+        // 2. Then run through each value in order.
+        watchSequence.forEach((WatchChange w) {
+          _handleWatchChange(w);
+        });
+
+        // 3. Then clear the watchSequence.
+        watchSequence.clear();
       }
     }
   }
 
+  Future _handleWatchChange(WatchChange wc) async {
+    assert(wc.tableName == tbName);
+    util.log('Watch Key: ${wc.rowKey}');
+    util.log('Watch Value ${UTF8.decode(wc.valueBytes)}');
+    String key = wc.rowKey.replaceFirst("${this.logPrefix}/", "");
+    if (key == wc.rowKey) {
+      print("Lacks prefix '${this.logPrefix}/', skipping...");
+      return;
+    }
+    String value;
+    switch (wc.changeType) {
+      case WatchChangeTypes.put:
+        value = UTF8.decode(wc.valueBytes);
+        break;
+      case WatchChangeTypes.delete:
+        value = null;
+        break;
+      default:
+        assert(false);
+    }
+
+    if (_isProposalKey(key)) {
+      if (value != null && !_acceptedProposals.contains(key)) {
+        await _receiveProposal(key, value);
+      }
+    } else {
+      print("Update callback: ${key}, ${value}");
+      this.updateCallback(key, value);
+    }
+  }
+
   void close() {
     this._watching = false;
   }