blob: 6b7d235d5180aafb069ebccd4c7af29f811b0bb3 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
library pingpong_test;
import 'dart:async';
import 'dart:convert' show UTF8;
import 'dart:math' as math;
import 'package:syncbase/syncbase_client.dart' as sc;
const String mtName = '/ns.dev.v.io:8101/tmp/benchmark/pingpong';
const String appName = 'app';
const String dbName = 'db';
const String tbName = 'tb';
const String syncPrefix = 'prefix';
const String sgSuffix = 'sg';
const String syncTimeSuffix = 'synctime';
const String openPermsJson =
'{"Admin":{"In":["..."]},"Write":{"In":["..."]},"Read":{"In":["..."]},"Resolve":{"In":["..."]},"Debug":{"In":["..."]}}';
sc.Perms openPerms = sc.SyncbaseClient.perms(openPermsJson);
enum PingPongPattern {
pingPong, // The Creator and first Joiner ping pong with each other.
roundRobin, // Creator pings each peer in turn.
reverseRobin, // Creator pings each peer in turn in reverse order.
cycle, // Every peer pings the next peer in line.
reverseCycle, // Every peer pings the previous peer in line.
pingRandom, // Peers randomly select who to ping next.
pingCreator, // The Creator ping pongs with all peers simultaneously.
pingAll, // All peers ping pong with all other peers simultaneously.
}
PingPongPattern parsePattern(String pattern) {
switch (pattern) {
case 'PingPong':
return PingPongPattern.pingPong;
case 'RoundRobin':
return PingPongPattern.roundRobin;
case 'ReverseRobin':
return PingPongPattern.reverseRobin;
case 'Cycle':
return PingPongPattern.cycle;
case 'ReverseCycle':
return PingPongPattern.reverseCycle;
case 'PingRandom':
return PingPongPattern.pingRandom;
case 'PingCreator':
return PingPongPattern.pingCreator;
case 'PingAll':
return PingPongPattern.pingAll;
default:
assert(false);
return null;
}
}
// The Ping Pong Benchmark will run with a different configuration depending on
// the arguments given. The results will be printed to stdout.
Future runPingPongTest(List<String> args, sc.SyncbaseClient c) async {
// TODO(alexfandrianto): There are a variety of extra conditions we can add.
// For example, we can add background activity, extra syncgroups, or load.
// We can even take down some of the peers.
// Read args.
// 0: handle
// 1: testID. Unique test ID to differentiate between test runs.
// 2: peerID. Your ID for this benchmark. 0 is the creator.
// 3: numPeers. The number of peers participating in this benchmark.
// 4: pattern. The way that Ping Pong will be executed.
// 5: numTimes. The number of times we will run through the pattern.
String testID = args[1];
int peerID = int.parse(args[2]);
int numPeers = int.parse(args[3]);
PingPongPattern pattern = parsePattern(args[4]);
int numTimes = int.parse(args[5]);
// Set up the basics of the app, db, and table.
sc.SyncbaseApp app = c.app(appName);
if (!(await app.exists())) {
await app.create(openPerms);
}
sc.SyncbaseDatabase db = app.noSqlDatabase(dbName);
if (!(await db.exists())) {
await db.create(openPerms);
}
sc.SyncbaseTable tb = db.table(tbName);
if (!(await tb.exists())) {
await tb.create(openPerms);
}
// TODO(alexfandrianto): We should add a now resume marker constant.
// https://github.com/vanadium/issues/issues/1155
Stream<sc.WatchChange> watchStream =
db.watch(tbName, syncPrefix, UTF8.encode('now'));
// Devices will find each other at this sgName.
// Note: We can also accomplish this via discovery, but for simplicity, we
// will use a common location on the global mount table.
String sgName = '${mtName}/${testID}/s0/%%sync/sg';
sc.SyncbaseSyncgroup sg = db.syncgroup(sgName);
var syncInfo = sc.SyncbaseClient.syncgroupMemberInfo(syncPriority: 3);
// Create the syncgroup or join it, as needed.
// Note: It is slightly preferable to start the creator earlier than the other
// peers in order to avoid racing join vs create.
if (peerID == 0) {
print('Creating Syncgroup ${sgName} as peer 0.');
var syncSpec = sc.SyncbaseClient.syncgroupSpec(
[sc.SyncbaseClient.syncgroupPrefix(tbName, syncPrefix)],
description: 'test syncgroup',
perms: openPerms,
mountTables: [mtName]);
// TODO(alexfandrianto): Why does this syncgroup create appear to take about
// 1 minute before succeeding?
// https://github.com/vanadium/issues/issues/1158
await sg.create(syncSpec, syncInfo);
} else {
print('Joining Syncgroup ${sgName} as peer ${peerID}.');
// TODO(alexfandrianto): Why does the first join fail so often?
// Same problem as create (except that join doesn't retry)?
// Is something slowly trying to mount? Or is it something else?
// https://github.com/vanadium/issues/issues/1162
bool success = false;
while (!success) {
try {
await sg.join(syncInfo);
success = true;
} catch(e) {
print('Failed to join. Waiting...');
await new Future.delayed(new Duration(seconds: 5));
print('Trying again...');
}
}
}
// After entering the syncgroup, we should watch the table.
print('Ready to time sync!');
// During this phase, there will be numPeers * (numPeers - 1) watch updates.
// Everybody will write a value to everyone else.
Completer initialCompleter = new Completer();
int count = 0;
int limit = numPeers * (numPeers - 1);
// Note: Dart only allows 1 listener for this kind of stream. Thus, we'll also
// have a doneCompleter for the test. It is accompanied by a few extra stats
// that will determine when the benchmark is complete.
Completer doneCompleter = new Completer();
// numSent tracks the number of times you've sent to a peer.
List<int> numSent = new List<int>.filled(numPeers, 0);
// numUpdates tracks the number of watch updates received so far.
// Sme patterns get more updates than other patterns.
int numUpdates = 0;
// Prepare the watch stream's combined listener.
// Phase 1 will involve syncing times to other devices using syncTimeSuffix.
// Phase 2 involves the actual test where peers follow the ping pong pattern.
watchStream.listen((sc.WatchChange wc) async {
assert(wc.changeType == sc.WatchChangeTypes.put);
if (doneCompleter.isCompleted) {
return;
}
String key = wc.rowKey;
String value = UTF8.decode(wc.valueBytes);
int senderID = senderFromKey(key);
int targetID = targetFromKey(key);
String suffix = suffixFromKey(key);
print(
'${senderID} => ${targetID}. Key ${key} and Value ${new DateTime.fromMillisecondsSinceEpoch(int.parse(value))}');
if (suffix == syncTimeSuffix) {
count++;
// We should make sure that this is a key that we need to care about.
// Main Questions: Is the target me? Should I respond?
if (targetID == peerID) {
if (senderID < peerID) {
await _writeTime(tb, peerID, senderID, syncTimeSuffix);
}
}
// This phase completes once we receive all the watch updates.
if (count == limit) {
initialCompleter.complete();
}
} else {
numUpdates++;
// If enough rounds have passed, complete!
if (_completedTimes(pattern, numPeers, numUpdates) >= numTimes) {
doneCompleter.complete();
return;
}
if (targetID == peerID) {
// After receiving something, the patterns ensure that it is this peer's
// turn to write next. It must find its target and the iteration number.
int newTarget = _computeTarget(pattern, senderID, targetID, numPeers);
int iteration =
_computeIteration(pattern, newTarget, numSent, numUpdates);
if (iteration > numTimes) {
// TODO(alexfandrianto): Is abruptly stopping like this problematic?
// We could just increase numTimes, right?
return; // don't write again
}
String suffix = '${iteration}';
numSent[newTarget]++;
_writeTime(tb, peerID, newTarget, suffix); // do not await this
}
}
});
// Phase 1: Synchronize clocks by having everyone send data to each other.
// Strategy: Send to those with a higher index than you.
// Send a response to those with a lower index than you.
for (int i = peerID + 1; i < numPeers; i++) {
await _writeTime(tb, peerID, i, syncTimeSuffix);
}
await initialCompleter.future;
print('Done with initial setup!');
// The remaining ping pongs had better finish within 2 seconds.
// Or else we have other problems.
await new Future.delayed(new Duration(seconds: 2));
// Phase 2: Initiate the ping pong pattern.
// The creator (peer 0) will usually be the sole initiator, though this can
// depend on the pattern selected.
print('Initial Writes...');
DateTime startTime = new DateTime.now();
for (int i = 0; i < numPeers; i++) {
if (_isInitialTarget(pattern, peerID, i, numPeers)) {
numSent[i]++;
_writeTime(tb, peerID, i, '0'); // do not await this
}
}
await doneCompleter.future;
// Benchmark complete! Print out the benchmark stats.
DateTime endTime = new DateTime.now();
print('Benchmark complete!');
print('Peer ${peerID} started at ${startTime} and ended at ${endTime}.');
int deltaMs = endTime.difference(startTime).inMilliseconds;
print('Completed ${numTimes} in ${deltaMs}. Avg: ${deltaMs / numTimes}');
// Give 2 more seconds to allow final syncs to occur...
// If this doesn't complete in 2 seconds, we have major problems.
await new Future.delayed(new Duration(seconds: 2));
}
bool _isInitialTarget(
PingPongPattern pattern, int senderID, int targetID, int numPeers) {
if (senderID == targetID) {
return false;
}
switch (pattern) {
case PingPongPattern.pingPong:
case PingPongPattern.roundRobin:
case PingPongPattern.cycle:
case PingPongPattern.pingRandom: // It's hard to pick just 1 peer randomly.
// PingRandom compromises by always picking peer 1.
return senderID == 0 && targetID == 1;
case PingPongPattern.reverseRobin:
case PingPongPattern.reverseCycle:
return senderID == 0 && targetID == numPeers - 1;
case PingPongPattern.pingCreator:
return senderID == 0; // send to everyone
case PingPongPattern.pingAll:
return senderID <
targetID; // go for higher indexes to ensure everyone gets pinged.
default:
assert(false);
return null;
}
}
int _computeTarget(
PingPongPattern pattern, int senderID, int targetID, int numPeers) {
switch (pattern) {
case PingPongPattern.pingPong:
case PingPongPattern.pingCreator:
case PingPongPattern.pingAll:
return senderID; // Just Ping/Pong back.
case PingPongPattern.roundRobin:
if (targetID == 0) {
// Need to Ping a new person.
int newTarget = (senderID + 1) % numPeers;
return newTarget == 0 ? 1 : newTarget;
}
return senderID; // Pong back to 0.
case PingPongPattern.reverseRobin:
if (targetID == 0) {
// Need to Ping a new person.
int newTarget = senderID - 1;
return newTarget == 0 ? numPeers - 1 : newTarget;
}
return senderID; // Pong back to 0.
case PingPongPattern.cycle:
return (targetID + 1) % numPeers;
case PingPongPattern.reverseCycle:
return (targetID - 1) % numPeers;
case PingPongPattern.pingRandom:
int newTarget;
while (newTarget == null || newTarget == targetID) {
// cannot target self
newTarget = new math.Random().nextInt(numPeers);
}
return newTarget;
default:
assert(false);
return null;
}
}
int _computeIteration(PingPongPattern pattern, int newTargetID,
List<int> numSent, int numUpdates) {
switch (pattern) {
case PingPongPattern.pingPong:
case PingPongPattern.roundRobin:
case PingPongPattern.reverseRobin:
return numUpdates ~/ 2;
case PingPongPattern.cycle:
case PingPongPattern.reverseCycle:
case PingPongPattern.pingRandom:
return numUpdates ~/ numSent.length;
case PingPongPattern.pingCreator:
case PingPongPattern.pingAll:
return numSent[
newTargetID]; // The iteration is the number of times you've sent.
default:
assert(false);
return null;
}
}
int _completedTimes(PingPongPattern pattern, int numPeers, int numUpdates) {
switch (pattern) {
case PingPongPattern.pingPong:
case PingPongPattern.roundRobin:
case PingPongPattern.reverseRobin:
return numUpdates ~/ 2;
case PingPongPattern.cycle:
case PingPongPattern.reverseCycle:
case PingPongPattern.pingRandom:
return numUpdates ~/ numPeers;
case PingPongPattern.pingCreator:
return numUpdates ~/ (2 * numPeers);
case PingPongPattern.pingAll:
return numUpdates ~/ (numPeers * (numPeers - 1));
default:
assert(false);
return null;
}
}
int senderFromKey(String key) => int.parse(key.split('/')[1]);
int targetFromKey(String key) => int.parse(key.split('/')[2]);
String suffixFromKey(String key) => key.split('/')[3];
Future _writeTime(
sc.SyncbaseTable tb, int peerID, int targetID, String suffix) async {
DateTime time = new DateTime.now(); // This should be DevModeGetTime.
String timeStr = '${time.millisecondsSinceEpoch}';
await tb.put(_computeKey(peerID, targetID, suffix), UTF8.encode(timeStr));
}
String _computeKey(int peerID, int targetID, String suffix) {
return '${syncPrefix}/${peerID}/${targetID}/${suffix}';
}