blob: ca0abfbd676e6e1a264c9931c76260b712d78592 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/// The goal of log writer is to generically manage game logs.
/// Syncbase will produce values that combine to form a List<GameCommand> while
/// the in-memory GameLog will also hold such a list.
///
/// Updating the GameLog from the Store/Syncbase:
/// GameLog will update to whatever Store data says.
/// If it merges, the game log, then it will write that information off.
/// Case A: Store is farther along than current state.
/// Continue.
/// Case B: Store is somehow behind the current state.
/// Update with the current state of the GameLog (if not sent yet).
/// Case C: Store's log branches off from the curernt GameLog.
/// Depending on phase, resolve the conflict differently and write the resolution.
///
/// Updating the Store:
/// When a new GameCommand is received (that doesn't contradict the existing log),
/// it is added to a list of pending changes and written to the local store.
/// Since this file includes Sky/Mojo, it will need to be mocked out for unit tests.
/// Unfortunately, imports can't be replaced, so the best thing to do is to swap out the whole file.
import 'dart:async';
import 'dart:convert' show UTF8, JSON;
import 'package:sky/services.dart' show embedder;
import 'package:ether/syncbase_client.dart'
show Perms, SyncbaseClient, SyncbaseTable, WatchChange, WatchChangeTypes;
enum SimulLevel{
TURN_BASED,
INDEPENDENT,
DEPENDENT
}
typedef void updateCallbackT(String key, String value);
log(String msg) {
DateTime now = new DateTime.now();
print('$now $msg');
}
Perms emptyPerms() => new Perms()..json = '{}';
class LogWriter {
final updateCallbackT updateCallback; // Takes in String key, String value
final List<int> users;
final SyncbaseClient _syncbaseClient;
bool inProposalMode = false;
Map<String, String> proposalsKnown; // Only updated via watch.
int _associatedUser;
int get associatedUser => _associatedUser;
void set associatedUser(int other) {
// Can be changed while the log is used; this should not be done during a proposal.
assert(!inProposalMode);
_associatedUser = other;
}
LogWriter(this.updateCallback, this.users)
: _syncbaseClient = new SyncbaseClient(embedder.connectToService,
'https://mojo.v.io/syncbase_server.mojo');
int seq = 0;
SyncbaseTable tb;
String sendMsg, recvMsg, putStr, getStr;
Future _doSyncbaseInit() async {
log('LogWriter.doSyncbaseInit');
if (tb != null) {
log('syncbase already initialized');
return;
}
var app = _syncbaseClient.app('app');
if (!(await app.exists())) {
await app.create(emptyPerms());
}
var db = app.noSqlDatabase('db');
if (!(await db.exists())) {
await db.create(emptyPerms());
}
var table = db.table('table');
if (!(await table.exists())) {
await table.create(emptyPerms());
}
tb = table;
log('syncbase is now initialized');
// Start to watch the stream.
Stream<WatchChange> watchStream = db.watch('table', '', await db.getResumeMarker());
_startWatch(watchStream); // Don't wait for this future.
}
Future _startWatch(Stream<WatchChange> watchStream) async {
log('watching for changes...');
// This stream never really ends, so I guess we'll watch forever.
await for (WatchChange wc in watchStream) {
assert(wc.tableName == 'table');
log('Watch Key: ${wc.rowName}');
log('Watch Value ${UTF8.decode(wc.valueBytes)}');
String key = wc.rowName;
String value;
switch (wc.changeType) {
case WatchChangeTypes.put:
value = UTF8.decode(wc.valueBytes);
break;
case WatchChangeTypes.delete:
value = null;
break;
default:
assert(false);
}
if (_isProposalKey(key)) {
if (value != null) {
await _receiveProposal(key, value);
}
} else {
print("Update callback: ${key}, ${value}");
this.updateCallback(key, value);
}
}
}
Future write(SimulLevel s, String value) async {
log('LogWriter.write start');
await _doSyncbaseInit();
assert(!inProposalMode);
String key = _logKey(associatedUser);
if (s == SimulLevel.DEPENDENT) {
inProposalMode = true;
proposalsKnown = new Map<String, String>();
String proposalData = JSON.encode({"key": key, "value": value});
String propKey = _proposalKey(associatedUser);
await _writeData(propKey, proposalData);
proposalsKnown[propKey] = proposalData;
// TODO(alexfandrianto): Remove when we have 4 players going at once.
// For quick development purposes, we may wish to keep this block.
// FAKE: Do some bonus work. Where "everyone else" accepts the proposal.
// Normally, one would rely on watch and the syncgroup peers to do this.
for (int i = 0; i < users.length; i++) {
if (users[i] != associatedUser) {
// DO NOT AWAIT HERE. It must be done "asynchronously".
_writeData(_proposalKey(users[i]), proposalData);
}
}
return;
}
await _writeData(key, value);
}
// Helper that writes data to the "store" and calls the update callback.
Future _writeData(String key, String value) async {
var row = tb.row(key);
await row.put(UTF8.encode(value));
}
/*
// _readData could be helpful eventually, but it's not needed yet.
Future<String> _readData(String key) async {
var row = tb.row(key);
if (!(await row.exists())) {
print("${key} did not exist");
return null;
}
var getBytes = await row.get();
return UTF8.decode(getBytes);
}
*/
Future _deleteData(String key) async {
var row = tb.row(key);
await row.delete();
}
// Helper that returns the log key using a mixture of timestamp + user.
String _logKey(int user) {
int ms = new DateTime.now().millisecondsSinceEpoch;
String key = "${ms}-${user}";
return key;
}
// Helper that handles a proposal update for the associatedUser.
Future _receiveProposal(String key, String proposalData) async {
assert(inProposalMode);
// Let us update our proposal map.
proposalsKnown[key] = proposalData;
// First check if something is already in data.
var pKey = _proposalKey(associatedUser);
var pData = proposalsKnown[pKey];
if (pData != null) {
// Potentially change your proposal, if that person has higher priority.
Map<String, String> pp = JSON.decode(pData);
Map<String, String> op = JSON.decode(proposalData);
String keyP = pp["key"];
String keyO = op["key"];
if (keyO.compareTo(keyP) < 0) {
// Then switch proposals.
await _writeData(pKey, proposalData);
}
} else {
// Otherwise, you have no proposal, so take theirs.
await _writeData(pKey, proposalData);
}
// Given these changes, check if you can commit the full batch.
if (await _checkIsProposalDone()) {
Map<String, String> pp = JSON.decode(pData);
String key = pp["key"];
String value = pp["value"];
print("All proposals accepted. Proceeding with ${key} ${value}");
// WOULD DO A BATCH!
for (int i = 0; i < users.length; i++) {
await _deleteData(_proposalKey(users[i]));
}
await _writeData(key, value);
proposalsKnown = null;
inProposalMode = false;
}
}
// More helpers for proposals.
bool _isProposalKey(String key) {
return key.indexOf("proposal") == 0;
}
String _proposalKey(int user) {
return "proposal${user}";
}
Future<bool> _checkIsProposalDone() async {
assert(inProposalMode);
String theProposal;
for (int i = 0; i < users.length; i++) {
String altProposal = proposalsKnown[_proposalKey(users[i])];
if (altProposal == null) {
return false;
} else if (theProposal != null && theProposal != altProposal) {
return false;
}
theProposal = altProposal;
}
return true;
}
}