Implementing multi-trip backend
Change-Id: I7088e97cd39b9fec5ac732a8f704bbe6d95bc9cb
diff --git a/Makefile b/Makefile
index da440ef..49f09c4 100644
--- a/Makefile
+++ b/Makefile
@@ -84,3 +84,11 @@
.PHONY: clean-tmp
clean-tmp:
rm -rf tmp
+
+.PHONY: clean-syncbase
+clean-syncbase:
+ rm -rf tmp/syncbase*
+
+.PHONY: clean-creds
+clean-creds:
+ rm -rf tmp/creds
diff --git a/src/debug.js b/src/debug.js
index 8e47959..040bef0 100644
--- a/src/debug.js
+++ b/src/debug.js
@@ -12,10 +12,6 @@
global.$ = $;
}
-debug.log = function(message) {
- if (console.debug) {
- console.debug(message);
- }
-};
+debug.log = console.debug? console.debug.bind(console) : $.noop;
module.exports = debug;
diff --git a/src/static/index.css b/src/static/index.css
index a0e7517..46adc30 100644
--- a/src/static/index.css
+++ b/src/static/index.css
@@ -154,7 +154,9 @@
.messages.headlines a {
background-color: rgba(255, 255, 255, .8);
border-radius: 2px;
+ display: inline-block;
padding: 1px 3px;
+ text-indent: initial;
}
.messages.headlines li.history {
diff --git a/src/strings.js b/src/strings.js
index 110e555..5ce352c 100644
--- a/src/strings.js
+++ b/src/strings.js
@@ -55,6 +55,10 @@
return 'Declined invite from ' + sender + ' to join ' +
ownerOfTrip(sender, owner) + ' trip.';
},
+ invitationDismissed: function(sender, owner) {
+ return sender + ' has invited you to join ' + ownerOfTrip(sender, owner) +
+ ' trip. (Expired)';
+ },
invitationReceived: function(sender, owner) {
return text(sender + ' has invited you to join ' +
ownerOfTrip(sender, owner) + ' trip. ') +
diff --git a/src/travel.js b/src/travel.js
index 9aae46d..510c0c1 100644
--- a/src/travel.js
+++ b/src/travel.js
@@ -280,31 +280,55 @@
}
},
+ dismissInvite: function(owner, sender) {
+ var invite = this.invites[owner];
+ if (invite) {
+ invite.resolve(strings.invitationDismissed(sender, owner));
+ delete this.invites[owner];
+ }
+ },
+
handleInvite: function(owner, recipient, sender) {
var self = this;
var invitationManager = this.sync.invitationManager;
var me = invitationManager.getUsername();
if (recipient === me) {
+ this.dismissInvite(owner, sender);
+
var message = new Message();
message.setType(Message.INFO);
message.setHtml(strings.invitationReceived(sender, owner));
message.setPromise(new Promise(function(resolve, reject) {
+ self.invites[owner] = {
+ resolve: resolve,
+ reject: reject
+ };
+
message.$.find('a[name=accept]').click(function() {
invitationManager.accept(owner).then(function() {
+ delete self.invites[owner];
return strings.invitationAccepted(sender, owner);
}).then(resolve, reject);
return false;
});
message.$.find('a[name=decline]').click(function() {
invitationManager.decline(owner).then(function() {
+ delete self.invites[owner];
return strings.invitationDeclined(sender, owner);
}).then(resolve, reject);
return false;
});
}));
- self.messages.push(message);
+ this.messages.push(message);
+ }
+ },
+
+ handleInviteDismiss: function(owner, recipient, sender) {
+ var me = this.sync.invitationManager.getUsername();
+ if (recipient === me) {
+ this.dismissInvite(owner, sender);
}
},
@@ -354,6 +378,8 @@
opts = opts || {};
var vanadiumWrapper = opts.vanadiumWrapper || vanadiumWrapperDefault;
+ this.invites = {};
+
var destinations = this.destinations = new Destinations();
destinations.onAdd.add(this.handleDestinationAdd);
destinations.onRemove.add(this.handleDestinationRemove);
@@ -413,6 +439,7 @@
});
sync.invitationManager.onInvite.add(this.handleInvite);
+ sync.invitationManager.onDismiss.add(this.handleInviteDismiss);
messages.onMessage.add(this.handleUserMessage);
diff --git a/src/travelsync.js b/src/travelsync.js
index 25d5f3a..3fedd67 100644
--- a/src/travelsync.js
+++ b/src/travelsync.js
@@ -42,14 +42,16 @@
},
message: function(messageContent) {
+ var self = this;
+
var id = uuid.v4();
var payload = $.extend({
timestamp: Date.now()
}, messageContent);
var value = this.marshal(payload);
- this.startup.then(function(services) {
- return services.syncbase.put(['messages', id], value);
+ this.startSyncbase.then(function(syncbase) {
+ return syncbase.put(['trips', self.upstreamId, 'messages', id], value);
}).catch(this.onError);
},
@@ -57,6 +59,20 @@
},
pushStatus: function() {
+ },
+
+ setActiveTrip: function(tripId) {
+ var self = this;
+
+ this.activeTripId = tripId;
+ /* We could use cached state, but we don't want to do state updates in
+ * response to a pull while any writes are going on. */
+ return this.startSyncbase.then(function(syncbase) {
+ syncbase.put(['user', 'tripMetadata', tripId, 'latestSwitch'],
+ Date.now()).catch(self.onError);
+
+ return syncbase.refresh();
+ });
}
},
@@ -103,11 +119,12 @@
var self = this;
if (this.isValid()) {
- var key = ['destinations', this.id];
+ var key = this.key();
+ var fieldIdx = key.length;
var writes = [];
$.each(DESTINATION_SCHEMA, function() {
- key[2] = this;
+ key[fieldIdx] = this;
var value = self.data[this];
writes.push(value?
dao.put(key, outer.marshal(value)) : dao.delete(key));
@@ -120,13 +137,19 @@
delete: function(dao) {
if (this.isValid()) {
- return dao.delete(['destinations', this.id]);
+ return dao.delete(this.key());
} else {
return Promise.resolve();
}
},
},
+ privates: {
+ key: function() {
+ return ['trips', this.outer.upstreamId, 'destinations', this.id];
+ }
+ },
+
events: {
/**
* Utility event to allow asynchronous update processes to cancel if
@@ -147,16 +170,16 @@
}),
batch: function(fn) {
- this.startup.then(function(services) {
- return services.syncbase.batch(fn);
+ this.startSyncbase.then(function(syncbase) {
+ return syncbase.batch(fn);
}).catch(this.onError);
},
nonBatched: function(fn) {
var self = this; //not really necessary but semantically correct
var fnArgs = Array.prototype.slice.call(arguments, 1);
- this.startup.then(function(services) {
- fnArgs.splice(0, 0, services.syncbase);
+ this.startSyncbase.then(function(syncbase) {
+ fnArgs.splice(0, 0, syncbase);
return fn.apply(self, fnArgs);
}).catch(this.onError);
},
@@ -176,7 +199,7 @@
this.destRecords.splice(index, 0, record);
- if (this.hasUpstream) {
+ if (this.hasValidUpstream()) {
this.batch(function(ops) {
return Promise.all([
self.putDestinationIds(ops),
@@ -194,7 +217,7 @@
var index = destination.getIndex();
var removed = this.destRecords.splice(index, 1)[0];
- if (this.hasUpstream && removed.isValid()) {
+ if (this.hasValidUpstream() && removed.isValid()) {
debug.log('Removing destination ' + index + ':' + removed.getId());
this.batch(function(ops) {
return Promise.all([
@@ -215,7 +238,7 @@
if (record && record.isValid()) {
record.setPlaceData(placeData, function(placeData, oldPlace) {
- if (self.hasUpstream) {
+ if (self.hasValidUpstream()) {
debug.log('Updating destination ' + index + ':' + this.getId() +
'.place = ' + JSON.stringify(oldPlace) + ' => ' +
JSON.stringify(placeData));
@@ -226,10 +249,20 @@
}
},
- pushDestinations: function() {
+ pushDestinations: function(force) {
var self = this;
this.batch(function(ops) {
+ if (!self.activeTripId) {
+ if (force) {
+ self.activeTripId = uuid.v4();
+ } else {
+ return;
+ }
+ }
+
+ self.setUpstream();
+
var asyncs = self.destRecords.map(function(record) {
return record.put(ops);
});
@@ -248,7 +281,8 @@
var ids = this.destRecords
.filter(function(r) { return r.isValid(); })
.map(function(r) { return r.getId(); });
- return dao.put(['destinations'], this.marshal(ids));
+ return dao.put(['trips', this.upstreamId, 'destinations'],
+ this.marshal(ids));
},
marshal: function(x) {
@@ -256,7 +290,7 @@
},
unmarshal: function(x) {
- return JSON.parse(x);
+ return x && JSON.parse(x);
},
truncateDestinations: function(targetLength) {
@@ -295,11 +329,16 @@
}
},
+ getDestinationIds: function(destinationsData) {
+ return this.unmarshal(typeof destinationsData === 'object'?
+ destinationsData._ : destinationsData);
+ },
+
processDestinations: function(destinationsData) {
var self = this;
if (!destinationsData) {
- if (this.hasUpstream) {
+ if (this.hasValidUpstream()) {
this.truncateDestinations(0);
} else {
//first push with no remote data; push local data as authority
@@ -309,11 +348,14 @@
} else {
var ids;
try {
- ids = this.unmarshal(destinationsData._ || destinationsData);
+ ids = this.getDestinationIds(destinationsData);
+ if (!ids) {
+ throw new TypeError('Missing destination IDs');
+ }
} catch(e) {
this.onError(e);
//assume it's corrupt and overwrite
- this.pushDestinations();
+ this.pushDestinations(true);
return;
}
@@ -372,16 +414,124 @@
});
if (this.destRecords.length > ids.length) {
+ /* TODO(rosswang): There is an edge case where this happens due to
+ * user interaction even though normally pulls are blocked while
+ * writes are outstanding. This can probably also happen on startup.
+ * Debug this or better yet make it go away. */
this.truncateDestinations(ids.length);
}
}
- this.hasUpstream = true;
+ this.setUpstream();
+ },
+
+ deleteTrip: function(tripId) {
+ this.batch(function(ops) {
+ return Promise.all([
+ ops.delete(['user', 'tripMetadata', tripId]),
+ ops.delete(['trips', tripId])
+ ]);
+ });
+ },
+
+ /**
+ * Given a mapping of trip IDs to trip info with metadata, pick the trip
+ * that the user is most likely to care about.
+ */
+ getDefaultTrip: function(userTripMetadata, trips) {
+ var self = this;
+ var best = {};
+
+ $.each(trips, function(id, trip) {
+ var md = userTripMetadata && userTripMetadata[id];
+ var latestSwitch = md && md.latestSwitch;
+
+ function usurp() {
+ best.trip = trip;
+ best.id = id;
+ best.latestSwitch = latestSwitch;
+ delete best.length;
+ }
+
+ if (latestSwitch === best.latestSwitch) {
+ if (!best.trip) {
+ usurp();
+ } else {
+ if (best.length === undefined) {
+ best.length = self.getDestinationIds(
+ best.trip.destinations).length;
+ }
+ var length = self.getDestinationIds(trip.destinations).length;
+ if (length > best.length) {
+ usurp();
+ best.length = length;
+ } else if (length === best.length && id < best.id) {
+ usurp();
+ best.length = length;
+ }
+ }
+ } else if (latestSwitch && best.latestSwitch === undefined ||
+ latestSwitch > best.latestSwitch) {
+ usurp();
+ }
+ });
+
+ return best.id;
+ },
+
+ isNascent: function(trip) {
+ return !trip.destinations ||
+ this.getDestinationIds(trip.destinations).length <= 1;
+ },
+
+ processTrips: function(userTripMetadata, trips) {
+ var trip;
+
+ if (this.activeTripId) {
+ trip = trips && trips[this.activeTripId];
+ if (!trip) {
+ debug.log('Last active trip ' + this.activeTripId +
+ ' is no longer present.');
+ } else if (this.isNascent(trip)) {
+ var establishedId = this.getDefaultTrip(userTripMetadata, trips);
+ if (establishedId && establishedId !== this.activeTripId &&
+ trips[establishedId]) {
+ this.deleteTrip(this.activeTripId);
+
+ debug.log('Replacing nascent trip ' + this.activeTripId +
+ ' with established trip ' + establishedId);
+ this.activeTripId = establishedId;
+ trip = trips[establishedId];
+ }
+ }
+ }
+
+ if (!trip) {
+ if (trips) {
+ this.activeTripId = this.getDefaultTrip(userTripMetadata, trips);
+ debug.log('Setting active trip ' + this.activeTripId);
+ trip = trips[this.activeTripId];
+ } else {
+ this.activeTripId = uuid.v4();
+ debug.log('Creating new trip ' + this.activeTripId);
+ trip = {};
+ }
+ }
+
+ this.processMessages(trip.messages);
+ this.processDestinations(trip.destinations);
},
processUpdates: function(data) {
- this.processMessages(data.messages);
- this.processDestinations(data.destinations);
+ this.processTrips(data.user && data.user.tripMetadata, data.trips);
+ },
+
+ hasValidUpstream: function() {
+ return this.upstreamId && this.upstreamId === this.activeTripId;
+ },
+
+ setUpstream: function() {
+ this.upstreamId = this.activeTripId;
},
serve: function(args) {
@@ -489,7 +639,7 @@
this.server = new vdlTravel.TravelSync();
var startRpc = prereqs.then(this.serve);
- var startSyncbase = prereqs.then(this.connectSyncbase);
+ var startSyncbase = this.startSyncbase = prereqs.then(this.connectSyncbase);
var startSyncgroupManager = Promise
.all([prereqs, startSyncbase])
.then(function(args) {
diff --git a/src/vanadium-wrapper/syncbase-wrapper.js b/src/vanadium-wrapper/syncbase-wrapper.js
index 974720b..92ba622 100644
--- a/src/vanadium-wrapper/syncbase-wrapper.js
+++ b/src/vanadium-wrapper/syncbase-wrapper.js
@@ -116,11 +116,16 @@
}
};
- fn.call(ops, ops).then(function(result) {
- return cb(null, result);
- }, function(err) {
- return cb(err);
- });
+ var p = fn.call(ops, ops);
+ if (p) {
+ p.then(function(result) {
+ return cb(null, result);
+ }, function(err) {
+ return cb(err);
+ });
+ } else {
+ cb();
+ }
}));
},
@@ -156,9 +161,10 @@
var current = this.pull.current;
if (!current) {
- current = this.pull.current = this.pull().then(function(v) {
+ current = this.pull.current = this.pull().then(function(data) {
self.pull.current = null;
- return v;
+ self.onUpdate(data);
+ return data;
}, function(err) {
self.pull.current = null;
throw err;
@@ -308,8 +314,7 @@
debug.log('Syncbase: aborting refresh due to writes');
resolve(self.pull()); //try/wait for idle again
} else {
- self.onUpdate(newData);
- resolve();
+ resolve(newData);
}
}).on('data', function(row) {
if (isHeader) {
@@ -371,10 +376,7 @@
}
setTimeout(self.watchLoop, 500);
};
- // TODO(rosswang): Right now sync fails if the initial db has a conflict, so
- // for now add a delay so that sync happens before we start db actions
- //process.nextTick(self.watchLoop);
- setTimeout(self.watchLoop, 2000);
+ process.nextTick(self.watchLoop);
}
});