Factoring out facets of TravelSync
Change-Id: I9319ce7dbd23acddd1279e1b527dac13e7c2b20e
diff --git a/src/invitation-manager.js b/src/invitation-manager.js
index fc98c76..86399db 100644
--- a/src/invitation-manager.js
+++ b/src/invitation-manager.js
@@ -150,13 +150,9 @@
onError: 'memory'
},
- /**
- * @param prereqs promise of { identity, mountNames, vanadiumWrapper }
- */
- init: function(prereqs, groupManagerPromise) {
+ init: function(usernamePromise, groupManagerPromise) {
var self = this;
- this.prereqs = prereqs;
this.syncbasePromise = groupManagerPromise.then(function(gm) {
gm.syncbaseWrapper.onUpdate.add(self.processUpdates);
return gm.syncbaseWrapper;
@@ -165,9 +161,9 @@
this.invitations = {};
- prereqs.then(function(args) {
+ usernamePromise.then(function(username) {
//this will have been set prior to groupManagerPromise completing
- self.username = args.identity.username;
+ self.username = username;
});
groupManagerPromise.then(function(gm) {
diff --git a/src/sync-util/destination-sync.js b/src/sync-util/destination-sync.js
new file mode 100644
index 0000000..72df869
--- /dev/null
+++ b/src/sync-util/destination-sync.js
@@ -0,0 +1,348 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var _ = require('lodash');
+var uuid = require('uuid');
+
+var debug = require('../debug');
+var Place = require('../place');
+
+var marshalling = require('./marshalling');
+
+var DESTINATION_SCHEMA = [ 'place' ];
+
+var DestinationSync = defineClass({
+ statics: {
+ getDestinationIds: function(destinationsData) {
+ return marshalling.readValue(destinationsData);
+ },
+ },
+
+ publics: {
+ bindDestinations: function(destinations) {
+ if (this.destinations) {
+ this.destinations.onAdd.remove(this.handleDestinationAdd);
+ this.destinations.onRemove.remove(this.handleDestinationRemove);
+ }
+
+ this.destinations = destinations;
+
+ if (destinations) {
+ destinations.onAdd.add(this.handleDestinationAdd);
+ destinations.onRemove.add(this.handleDestinationRemove);
+ }
+ },
+
+ processDestinations: function(destinationsData) {
+ var self = this;
+
+ if (!destinationsData) {
+ if (this.tripManager.hasValidUpstream()) {
+ this.truncateDestinations(0);
+ } else {
+ //first push with no remote data; push local data as authority
+ this.pushDestinations();
+ }
+
+ } else {
+ var ids;
+ try {
+ ids = this.getDestinationIds(destinationsData);
+ if (!ids) {
+ throw new TypeError('Missing destination IDs');
+ }
+ } catch(e) {
+ this.onError(e);
+ //assume it's corrupt and overwrite
+ this.pushDestinations(true);
+ return;
+ }
+
+ $.each(ids, function(i, id) {
+ /* Don't bother reordering existing destinations by ID; instead, just
+ * overwrite everything. TODO(rosswang): optimize to reorder. */
+ var record = self.destRecords[i];
+ var destination = self.destinations.get(i);
+
+ if (!record) {
+ /* Add the record invalid so that the destination add handler leaves
+ * population to this handler. */
+ record = self.destRecords[i] = self.destinationRecord();
+ destination = self.destinations.add(i);
+ }
+
+ if (record.getId() !== id) {
+ record.setId(id);
+ debug.log('Pulling destination ' + i + ':' + id);
+ }
+
+ var destinationData = destinationsData[id];
+ var newPlace = destinationData &&
+ marshalling.unmarshal(destinationData.place);
+
+ record.setPlaceData(newPlace, function(newPlace, oldPlace) {
+ debug.log('Pulled update for destination ' + i + ':' + id +
+ '.place = ' + JSON.stringify(oldPlace) + ' => ' +
+ JSON.stringify(newPlace));
+
+ if (newPlace) {
+ var cancelled = false;
+ record.cancelPlaceAsync.add(function() {
+ cancelled = true;
+ });
+
+ Place.fromObject(self.mapsDeps, newPlace)
+ .catch(function(err) {
+ //assume it's corrupt and overwrite
+ if (!cancelled) {
+ self.updateDestinationPlace(destination);
+ throw err;
+ }
+ })
+ .then(function(place) {
+ if (!cancelled) {
+ destination.setPlace(place);
+ }
+ }).catch(function(err) {
+ self.onError(err);
+ });
+ } else {
+ destination.setPlace(null);
+ }
+ });
+ });
+
+ if (this.destRecords.length > ids.length) {
+ this.truncateDestinations(ids.length);
+ }
+ }
+ }
+ },
+
+ privates: {
+ destinationRecord: defineClass.innerClass({
+ publics: {
+ isValid: function() {
+ return this.id !== undefined;
+ },
+
+ invalidate: function() {
+ delete this.id;
+ },
+
+ getId: function() {
+ return this.id;
+ },
+
+ setId: function(id) {
+ this.id = id;
+ },
+
+ /**
+ * @param placeData the plain object representation of a `Place`.
+ * @param changedCallback a function called if the place is actually
+ * changed, with the params newPlace, oldPlace, as the new and old
+ * plain object places, respectively.
+ */
+ setPlaceData: function(placeData, changedCallback) {
+ var old = this.data.place;
+ if (!_.isEqual(old, placeData) && (old || placeData)) {
+ this.data.place = placeData;
+
+ this.cancelPlaceAsync();
+
+ if (changedCallback) {
+ changedCallback.call(this.ifc, placeData, old);
+ }
+ }
+ },
+
+ put: function(dao) {
+ var self = this;
+
+ if (this.isValid()) {
+ var key = this.key();
+ var fieldIdx = key.length;
+ var writes = [];
+
+ $.each(DESTINATION_SCHEMA, function() {
+ key[fieldIdx] = this;
+ var value = self.data[this];
+ writes.push(value?
+ dao.put(key, marshalling.marshal(value)) : dao.delete(key));
+ });
+ return Promise.all(writes);
+ } else {
+ return Promise.resolve();
+ }
+ },
+
+ delete: function(dao) {
+ if (this.isValid()) {
+ return dao.delete(this.key());
+ } else {
+ return Promise.resolve();
+ }
+ },
+ },
+
+ privates: {
+ key: function() {
+ return this.outer.tripManager.getDestinationsKey(this.id);
+ }
+ },
+
+ events: {
+ /**
+ * Utility event to allow asynchronous update processes to cancel if
+ * they do not finish by the time the place has been updated again.
+ */
+ cancelPlaceAsync: 'once'
+ },
+
+ init: function(place, generateId) {
+ if (generateId) {
+ this.id = uuid.v4();
+ }
+
+ this.data = {
+ place: place && place.toObject()
+ };
+ }
+ }),
+
+ handleDestinationAdd: function (destination) {
+ var self = this;
+
+ var index = destination.getIndex();
+ var record = this.destRecords[index];
+
+ if (!record || record.isValid()) {
+ var place = destination.getPlace();
+
+ record = this.destinationRecord(place, true);
+
+ debug.log('Adding destination ' + index + ':' + record.getId());
+
+ this.destRecords.splice(index, 0, record);
+
+ if (this.tripManager.hasValidUpstream()) {
+ this.sbw.batch(function(ops) {
+ return Promise.all([
+ self.putDestinationIds(ops),
+ record.put(ops)
+ ]);
+ });
+ }
+ }
+
+ destination.onPlaceChange.add(this.handleDestinationPlaceChange);
+ },
+
+ handleDestinationRemove: function(destination) {
+ var self = this;
+
+ var index = destination.getIndex();
+ var removed = this.destRecords.splice(index, 1)[0];
+ if (this.tripManager.hasValidUpstream() && removed.isValid()) {
+ debug.log('Removing destination ' + index + ':' + removed.getId());
+ this.sbw.batch(function(ops) {
+ return Promise.all([
+ self.putDestinationIds(ops),
+ removed.delete(ops)
+ ]);
+ });
+ }
+ },
+
+ updateDestinationPlace: function(destination) {
+ var self = this;
+
+ var index = destination.getIndex();
+ var record = this.destRecords[index];
+ var place = destination.getPlace();
+ var placeData = place && place.toObject();
+
+ if (record && record.isValid()) {
+ record.setPlaceData(placeData, function(placeData, oldPlace) {
+ if (self.tripManager.hasValidUpstream()) {
+ debug.log('Updating destination ' + index + ':' + this.getId() +
+ '.place = ' + JSON.stringify(oldPlace) + ' => ' +
+ JSON.stringify(placeData));
+
+ self.sbw.nonBatched(this.put);
+ }
+ });
+ }
+ },
+
+ pushDestinations: function(force) {
+ var self = this;
+
+ this.sbw.batch(function(ops) {
+ if (!self.tripManager.getActiveTripId()) {
+ if (force) {
+ self.tripManager.setActiveTripId(uuid.v4());
+ } else {
+ return;
+ }
+ }
+
+ self.tripManager.setUpstream();
+
+ var asyncs = self.destRecords.map(function(record) {
+ return record.put(ops);
+ });
+ asyncs.push(self.putDestinationIds(ops));
+ return Promise.all(asyncs);
+ });
+ },
+
+ /* A note on these operations: Syncbase client operations occur
+ * asynchronously, in response to events that can rapidly change state. As
+ * such, each write operation must first check to ensure the record it's
+ * updating for is still valid (has a defined id).
+ */
+
+ putDestinationIds: function(dao) {
+ var ids = this.destRecords
+ .filter(function(r) { return r.isValid(); })
+ .map(function(r) { return r.getId(); });
+ return dao.put(this.tripManager.getDestinationsKey(),
+ marshalling.marshal(ids));
+ },
+
+ truncateDestinations: function(targetLength) {
+ if (this.destinations.count() > targetLength) {
+ debug.log('Truncating destinations to ' + targetLength);
+ }
+
+ while (this.destinations.count() > targetLength) {
+ var last = this.destinations.count() - 1;
+ this.destRecords[last].invalidate();
+ this.destinations.remove(last);
+ }
+ }
+ },
+
+ init: function(mapsDependencies, deferredSyncbaseWrapper, tripManager) {
+ var self = this;
+
+ this.mapsDeps = mapsDependencies;
+ this.sbw = deferredSyncbaseWrapper;
+ this.tripManager = tripManager;
+ this.destRecords = [];
+
+ this.handleDestinationPlaceChange = function() {
+ self.updateDestinationPlace(this);
+ };
+ }
+});
+
+module.exports = DestinationSync;
diff --git a/src/sync-util/marshalling.js b/src/sync-util/marshalling.js
new file mode 100644
index 0000000..c877d64
--- /dev/null
+++ b/src/sync-util/marshalling.js
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+function marshal(x) {
+ return JSON.stringify(x);
+}
+
+function unmarshal(x) {
+ if (!x) {
+ return x;
+ }
+
+ if (typeof x === 'object') {
+ throw new TypeError('Unexpected persisted object ' + JSON.stringify(x));
+ }
+
+ return JSON.parse(x);
+}
+
+function readValue(entry) {
+ return unmarshal(typeof entry === 'object'? entry._ : entry);
+}
+
+module.exports = {
+ marshal: marshal,
+ unmarshal: unmarshal,
+ readValue: readValue
+};
\ No newline at end of file
diff --git a/src/sync-util/message-sync.js b/src/sync-util/message-sync.js
new file mode 100644
index 0000000..4053819
--- /dev/null
+++ b/src/sync-util/message-sync.js
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var uuid = require('uuid');
+
+var marshalling = require('./marshalling');
+
+var MessageSync = defineClass({
+ publics: {
+ message: function(messageContent) {
+ var id = uuid.v4();
+ var payload = $.extend({
+ timestamp: Date.now()
+ }, messageContent);
+ var value = marshalling.marshal(payload);
+
+ this.sbw.put(this.tripManager.getMessagesKey(id), value);
+ },
+
+ processMessages: function(messageData) {
+ var self = this;
+
+ if (messageData) {
+ /* Dispatch new messages in time order, though don't put them before
+ * local messages. */
+ var newMessages = [];
+ $.each(messageData, function(id, serializedMessage) {
+ if (!self.messages[id]) {
+ var message = marshalling.unmarshal(serializedMessage);
+ newMessages.push(message);
+ self.messages[id] = message;
+ }
+ });
+ newMessages.sort(function(a, b) {
+ return a.timestamp < b.timestamp? -1 :
+ a.timestamp > b.timestamp? 1 :
+ 0;
+ });
+
+ this.onMessages(newMessages);
+ }
+ }
+ },
+
+ events: [ 'onMessages' ],
+
+ init: function(deferredSyncbaseWrapper, tripManager) {
+ this.sbw = deferredSyncbaseWrapper;
+ this.tripManager = tripManager;
+
+ this.messages = {};
+ }
+});
+
+module.exports = MessageSync;
\ No newline at end of file
diff --git a/src/sync-util/trip-manager.js b/src/sync-util/trip-manager.js
new file mode 100644
index 0000000..d4d46c0
--- /dev/null
+++ b/src/sync-util/trip-manager.js
@@ -0,0 +1,259 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+var _ = require('lodash');
+var uuid = require('uuid');
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var debug = require('../debug');
+
+var DestinationSync = require('./destination-sync');
+
+var TripManager = defineClass({
+ statics: {
+ getTripLength: function(trip) {
+ return trip.destinations?
+ DestinationSync.getDestinationIds(trip.destinations).length : 0;
+ }
+ },
+
+ publics: {
+ createTripSyncGroup: function(groupManager, tripId) {
+ return groupManager.createSyncGroup('trip-' + tripId,
+ [['trips', tripId]]);
+ },
+
+ joinTripSyncGroup: function(owner, tripId) {
+ return this.startSyncgroupManager.then(function(gm) {
+ return gm.joinSyncGroup(owner, 'trip-' + tripId);
+ });
+ },
+
+ /**
+ * Sets the active trip to the given trip ID after it is available.
+ */
+ watchForTrip: function(tripId) {
+ this.awaitedTripId = tripId;
+ },
+
+ getMessageData: function() {
+ return this.activeTrip && this.activeTrip.messages;
+ },
+
+ getDestinationData: function() {
+ return this.activeTrip && this.activeTrip.destinations;
+ },
+
+ getActiveTripId: function() {
+ return this.activeTripId;
+ },
+
+ getActiveTripOwner: function() {
+ return this.activeTrip && this.activeTrip.owner;
+ },
+
+ setActiveTripId: function(tripId) {
+ var old = this.activeTripId;
+ this.activeTripId = tripId;
+ this.sbw.put(['user', 'tripMetadata', tripId, 'latestSwitch'],
+ Date.now());
+
+ if (old !== tripId) {
+ this.activeTrip = null;
+ this.activeTripOwner = null;
+ }
+ },
+
+ hasValidUpstream: function() {
+ return this.upstreamTripId && this.upstreamTripId === this.activeTripId;
+ },
+
+ getTripKey: function() {
+ return ['trips', this.upstreamTripId].concat(_.flattenDeep(arguments));
+ },
+
+ getDestinationsKey: function() {
+ return this.getTripKey('destinations', arguments);
+ },
+
+ getMessagesKey: function() {
+ return this.getTripKey('messages', arguments);
+ },
+
+ /**
+ * This should be called whenever the upstream should be considered ready to
+ * receive updates from local, i.e. after refreshing from remote or before
+ * pushing from local.
+ */
+ setUpstream: function() {
+ this.upstreamTripId = this.activeTripId;
+ },
+
+ processTrips: function(userTripMetadata, trips) {
+ var self = this;
+
+ this.manageTripSyncGroups(trips);
+
+ var trip;
+
+ if (this.awaitedTripId) {
+ this.setActiveTripId(this.awaitedTripId);
+ delete this.awaitedTripId;
+
+ /* Override latestSwitch this frame. (Subsequently syncbase will be up
+ * to date.) */
+ if (!userTripMetadata) {
+ userTripMetadata = {};
+ }
+ var activeTripMd = userTripMetadata[this.activeTripId];
+ if (!activeTripMd) {
+ activeTripMd = userTripMetadata[this.activeTripId] = {};
+ }
+ activeTripMd.latestSwitch = Date.now();
+ }
+
+ if (this.activeTripId) {
+ trip = trips && trips[this.activeTripId];
+ if (!trip) {
+ debug.log('Last active trip ' + this.activeTripId +
+ ' is no longer present.');
+ } else {
+ var defaultId = this.getDefaultTrip(userTripMetadata, trips);
+ if (defaultId && defaultId !== this.activeTripId &&
+ trips[defaultId]) {
+ if (this.isNascent(trip)) {
+ this.deleteTrip(this.activeTripId);
+ debug.log('Replacing nascent trip ' + this.activeTripId +
+ ' with established trip ' + defaultId);
+ } else {
+ /* TODO(rosswang): for now, sync trip changes. This behavior may
+ * change. */
+ debug.log('Replacing active trip ' + this.activeTripId +
+ ' with most recent selection ' + defaultId);
+ }
+
+ this.activeTripId = defaultId;
+ trip = trips[defaultId];
+ }
+ }
+ }
+
+ if (!trip) {
+ if (trips) {
+ this.activeTripId = this.getDefaultTrip(userTripMetadata, trips);
+ debug.log('Setting active trip ' + this.activeTripId);
+ trip = trips[this.activeTripId];
+ } else {
+ var tripId = this.activeTripId = uuid.v4();
+ debug.log('Creating new trip ' + tripId);
+ trip = {}; //don't initialize owner until the syncgroup is ready
+ this.startSyncgroupManager.then(function(gm) {
+ return Promise.all([
+ self.createTripSyncGroup(gm, tripId),
+ self.usernamePromise
+ ]).then(function(args) {
+ return gm.syncbaseWrapper.put(['trips', tripId, 'owner'],
+ args[1]).then(function() {
+ return args[0];
+ });
+ })
+ .catch(self.onError);
+ });
+ }
+ }
+
+ this.activeTrip = trip;
+ }
+ },
+
+ privates: {
+ deleteTrip: function(tripId) {
+ this.sbw.batch(function(ops) {
+ return Promise.all([
+ ops.delete(['user', 'tripMetadata', tripId]),
+ ops.delete(['trips', tripId])
+ ]);
+ });
+ },
+
+ /**
+ * Given a mapping of trip IDs to trip info with metadata, pick the trip
+ * that the user is most likely to care about.
+ */
+ getDefaultTrip: function(userTripMetadata, trips) {
+ var self = this;
+ var best = {};
+
+ $.each(trips, function(id, trip) {
+ var md = userTripMetadata && userTripMetadata[id];
+ var latestSwitch = md && md.latestSwitch;
+
+ function usurp() {
+ best.trip = trip;
+ best.id = id;
+ best.latestSwitch = latestSwitch;
+ delete best.length;
+ }
+
+ if (latestSwitch === best.latestSwitch) {
+ if (!best.trip) {
+ usurp();
+ } else {
+ if (best.length === undefined) {
+ best.length = self.getTripLength(best.trip);
+ }
+ var length = self.getTripLength(trip);
+ if (length > best.length) {
+ usurp();
+ best.length = length;
+ } else if (length === best.length && id < best.id) {
+ usurp();
+ best.length = length;
+ }
+ }
+ } else if (latestSwitch && best.latestSwitch === undefined ||
+ latestSwitch > best.latestSwitch) {
+ usurp();
+ }
+ });
+
+ return best.id;
+ },
+
+ isNascent: function(trip) {
+ return this.getTripLength(trip) <= 1;
+ },
+
+ manageTripSyncGroups: function(trips) {
+ var self = this;
+
+ //TODO(rosswang): maybe make this more intelligent, and handle ejection
+ if (trips) {
+ $.each(trips, function(tripId, trip) {
+ /* Join is idempotent, but repeatedly joining might be causing major,
+ * fatal sluggishness. TODO(rosswang): if this is not the case, maybe
+ * go ahead and poll. */
+ if (!self.joinedTrips.has(tripId) && trip.owner) {
+ self.joinedTrips.add(tripId);
+ self.joinTripSyncGroup(trip.owner, tripId).catch(self.onError);
+ }
+ });
+ }
+ }
+ },
+
+ init: function(usernamePromise, deferredSyncbaseWrapper,
+ startSyncgroupManager) {
+ this.usernamePromise = usernamePromise;
+ this.sbw = deferredSyncbaseWrapper;
+ this.startSyncgroupManager = startSyncgroupManager;
+ this.joinedTrips = new Set();
+ }
+});
+
+module.exports = TripManager;
\ No newline at end of file
diff --git a/src/travelsync.js b/src/travelsync.js
index f312efc..9e0d635 100644
--- a/src/travelsync.js
+++ b/src/travelsync.js
@@ -4,24 +4,20 @@
require('es6-shim');
-var _ = require('lodash');
-var uuid = require('uuid');
var vanadium = require('vanadium');
-var $ = require('./util/jquery');
var defineClass = require('./util/define-class');
-var debug = require('./debug');
var SyncgroupManager = require('./syncgroup-manager');
var InvitationManager = require('./invitation-manager');
-var Place = require('./place');
var DeferredSbWrapper = require('./sync-util/deferred-sb-wrapper');
+var DestinationSync = require('./sync-util/destination-sync');
+var MessageSync = require('./sync-util/message-sync');
+var TripManager = require('./sync-util/trip-manager');
var vdlTravel = require('../ifc');
-var DESTINATION_SCHEMA = [ 'place' ];
-
var TravelSync = defineClass({
/* Schema note: although we don't support merging destination list structure
* changes, we use indirection in the destination list so that we don't have
@@ -29,43 +25,23 @@
* parallel destination edits. */
publics: {
bindDestinations: function(destinations) {
- if (this.destinations) {
- this.destinations.onAdd.remove(this.handleDestinationAdd);
- this.destinations.onRemove.remove(this.handleDestinationRemove);
- }
-
- this.destinations = destinations;
-
- if (destinations) {
- destinations.onAdd.add(this.handleDestinationAdd);
- destinations.onRemove.add(this.handleDestinationRemove);
- }
+ this.destinationSync.bindDestinations(destinations);
},
message: function(messageContent) {
- var self = this;
-
- var id = uuid.v4();
- var payload = $.extend({
- timestamp: Date.now()
- }, messageContent);
- var value = this.marshal(payload);
-
- this.sbw.put(['trips', self.upstreamId, 'messages', id], value);
+ this.messageSync.message(messageContent);
},
getActiveTripId: function() {
- return this.activeTripId;
+ return this.tripManager.getActiveTripId();
},
getActiveTripOwner: function() {
- return this.activeTripOwner;
+ return this.tripManager.getActiveTripOwner();
},
setActiveTripId: function(tripId) {
- this.activeTripId = tripId;
- this.sbw.put(['user', 'tripMetadata', tripId, 'latestSwitch'],
- Date.now());
+ this.tripManager.setActiveTripId(tripId);
},
getData: function() {
@@ -76,516 +52,24 @@
* Sets the active trip to the given trip ID after it is available.
*/
watchForTrip: function(tripId) {
- this.awaitedTripId = tripId;
+ this.tripManager.watchForTrip(tripId);
},
joinTripSyncGroup: function(owner, tripId) {
- return this.startSyncgroupManager.then(function(gm) {
- return gm.joinSyncGroup(owner, 'trip-' + tripId);
- });
+ return this.tripManager.joinTripSyncGroup(owner, tripId);
}
},
privates: {
- destinationRecord: defineClass.innerClass({
- publics: {
- isValid: function() {
- return this.id !== undefined;
- },
-
- invalidate: function() {
- delete this.id;
- },
-
- getId: function() {
- return this.id;
- },
-
- setId: function(id) {
- this.id = id;
- },
-
- /**
- * @param placeData the plain object representation of a `Place`.
- * @param changedCallback a function called if the place is actually
- * changed, with the params newPlace, oldPlace, as the new and old
- * plain object places, respectively.
- */
- setPlaceData: function(placeData, changedCallback) {
- var old = this.data.place;
- if (!_.isEqual(old, placeData) && (old || placeData)) {
- this.data.place = placeData;
-
- this.cancelPlaceAsync();
-
- if (changedCallback) {
- changedCallback.call(this.ifc, placeData, old);
- }
- }
- },
-
- put: function(dao) {
- var outer = this.outer;
- var self = this;
-
- if (this.isValid()) {
- var key = this.key();
- var fieldIdx = key.length;
- var writes = [];
-
- $.each(DESTINATION_SCHEMA, function() {
- key[fieldIdx] = this;
- var value = self.data[this];
- writes.push(value?
- dao.put(key, outer.marshal(value)) : dao.delete(key));
- });
- return Promise.all(writes);
- } else {
- return Promise.resolve();
- }
- },
-
- delete: function(dao) {
- if (this.isValid()) {
- return dao.delete(this.key());
- } else {
- return Promise.resolve();
- }
- },
- },
-
- privates: {
- key: function() {
- return ['trips', this.outer.upstreamId, 'destinations', this.id];
- }
- },
-
- events: {
- /**
- * Utility event to allow asynchronous update processes to cancel if
- * they do not finish by the time the place has been updated again.
- */
- cancelPlaceAsync: 'once'
- },
-
- init: function(place, generateId) {
- if (generateId) {
- this.id = uuid.v4();
- }
-
- this.data = {
- place: place && place.toObject()
- };
- }
- }),
-
- handleDestinationAdd: function (destination) {
- var self = this;
-
- var index = destination.getIndex();
- var record = this.destRecords[index];
-
- if (!record || record.isValid()) {
- var place = destination.getPlace();
-
- record = this.destinationRecord(place, true);
-
- debug.log('Adding destination ' + index + ':' + record.getId());
-
- this.destRecords.splice(index, 0, record);
-
- if (this.hasValidUpstream()) {
- this.sbw.batch(function(ops) {
- return Promise.all([
- self.putDestinationIds(ops),
- record.put(ops)
- ]);
- });
- }
- }
-
- destination.onPlaceChange.add(this.handleDestinationPlaceChange);
- },
-
- handleDestinationRemove: function(destination) {
- var self = this;
-
- var index = destination.getIndex();
- var removed = this.destRecords.splice(index, 1)[0];
- if (this.hasValidUpstream() && removed.isValid()) {
- debug.log('Removing destination ' + index + ':' + removed.getId());
- this.sbw.batch(function(ops) {
- return Promise.all([
- self.putDestinationIds(ops),
- removed.delete(ops)
- ]);
- });
- }
- },
-
- updateDestinationPlace: function(destination) {
- var self = this;
-
- var index = destination.getIndex();
- var record = this.destRecords[index];
- var place = destination.getPlace();
- var placeData = place && place.toObject();
-
- if (record && record.isValid()) {
- record.setPlaceData(placeData, function(placeData, oldPlace) {
- if (self.hasValidUpstream()) {
- debug.log('Updating destination ' + index + ':' + this.getId() +
- '.place = ' + JSON.stringify(oldPlace) + ' => ' +
- JSON.stringify(placeData));
-
- self.sbw.nonBatched(this.put);
- }
- });
- }
- },
-
- pushDestinations: function(force) {
- var self = this;
-
- this.sbw.batch(function(ops) {
- if (!self.activeTripId) {
- if (force) {
- self.activeTripId = uuid.v4();
- } else {
- return;
- }
- }
-
- self.setUpstream();
-
- var asyncs = self.destRecords.map(function(record) {
- return record.put(ops);
- });
- asyncs.push(self.putDestinationIds(ops));
- return Promise.all(asyncs);
- });
- },
-
- /* A note on these operations: Syncbase client operations occur
- * asynchronously, in response to events that can rapidly change state. As
- * such, each write operation must first check to ensure the record it's
- * updating for is still valid (has a defined id).
- */
-
- putDestinationIds: function(dao) {
- var ids = this.destRecords
- .filter(function(r) { return r.isValid(); })
- .map(function(r) { return r.getId(); });
- return dao.put(['trips', this.upstreamId, 'destinations'],
- this.marshal(ids));
- },
-
- marshal: function(x) {
- return JSON.stringify(x);
- },
-
- unmarshal: function(x) {
- if (!x) {
- return x;
- }
-
- if (typeof x === 'object') {
- throw new TypeError('Unexpected persisted object ' + JSON.stringify(x));
- }
-
- return JSON.parse(x);
- },
-
- truncateDestinations: function(targetLength) {
- if (this.destinations.count() > targetLength) {
- debug.log('Truncating destinations to ' + targetLength);
- }
-
- while (this.destinations.count() > targetLength) {
- var last = this.destinations.count() - 1;
- this.destRecords[last].invalidate();
- this.destinations.remove(last);
- }
- },
-
- processMessages: function(messageData) {
- var self = this;
-
- if (messageData) {
- /* Dispatch new messages in time order, though don't put them before
- * local messages. */
- var newMessages = [];
- $.each(messageData, function(id, serializedMessage) {
- if (!self.messages[id]) {
- var message = self.unmarshal(serializedMessage);
- newMessages.push(message);
- self.messages[id] = message;
- }
- });
- newMessages.sort(function(a, b) {
- return a.timestamp < b.timestamp? -1 :
- a.timestamp > b.timestamp? 1 :
- 0;
- });
-
- this.onMessages(newMessages);
- }
- },
-
- getDestinationIds: function(destinationsData) {
- return this.unmarshal(typeof destinationsData === 'object'?
- destinationsData._ : destinationsData);
- },
-
- processDestinations: function(destinationsData) {
- var self = this;
-
- if (!destinationsData) {
- if (this.hasValidUpstream()) {
- this.truncateDestinations(0);
- } else {
- //first push with no remote data; push local data as authority
- this.pushDestinations();
- }
-
- } else {
- var ids;
- try {
- ids = this.getDestinationIds(destinationsData);
- if (!ids) {
- throw new TypeError('Missing destination IDs');
- }
- } catch(e) {
- this.onError(e);
- //assume it's corrupt and overwrite
- this.pushDestinations(true);
- return;
- }
-
- $.each(ids, function(i, id) {
- /* Don't bother reordering existing destinations by ID; instead, just
- * overwrite everything. TODO(rosswang): optimize to reorder. */
- var record = self.destRecords[i];
- var destination = self.destinations.get(i);
-
- if (!record) {
- /* Add the record invalid so that the destination add handler leaves
- * population to this handler. */
- record = self.destRecords[i] = self.destinationRecord();
- destination = self.destinations.add(i);
- }
-
- if (record.getId() !== id) {
- record.setId(id);
- debug.log('Pulling destination ' + i + ':' + id);
- }
-
- var destinationData = destinationsData[id];
- var newPlace = destinationData &&
- self.unmarshal(destinationData.place);
-
- record.setPlaceData(newPlace, function(newPlace, oldPlace) {
- debug.log('Pulled update for destination ' + i + ':' + id +
- '.place = ' + JSON.stringify(oldPlace) + ' => ' +
- JSON.stringify(newPlace));
-
- if (newPlace) {
- var cancelled = false;
- record.cancelPlaceAsync.add(function() {
- cancelled = true;
- });
-
- Place.fromObject(self.mapsDeps, newPlace)
- .catch(function(err) {
- //assume it's corrupt and overwrite
- if (!cancelled) {
- self.updateDestinationPlace(destination);
- throw err;
- }
- })
- .then(function(place) {
- if (!cancelled) {
- destination.setPlace(place);
- }
- }).catch(function(err) {
- self.onError(err);
- });
- } else {
- destination.setPlace(null);
- }
- });
- });
-
- if (this.destRecords.length > ids.length) {
- this.truncateDestinations(ids.length);
- }
- }
-
- this.setUpstream();
- },
-
- deleteTrip: function(tripId) {
- this.sbw.batch(function(ops) {
- return Promise.all([
- ops.delete(['user', 'tripMetadata', tripId]),
- ops.delete(['trips', tripId])
- ]);
- });
- },
-
- /**
- * Given a mapping of trip IDs to trip info with metadata, pick the trip
- * that the user is most likely to care about.
- */
- getDefaultTrip: function(userTripMetadata, trips) {
- var self = this;
- var best = {};
-
- $.each(trips, function(id, trip) {
- var md = userTripMetadata && userTripMetadata[id];
- var latestSwitch = md && md.latestSwitch;
-
- function usurp() {
- best.trip = trip;
- best.id = id;
- best.latestSwitch = latestSwitch;
- delete best.length;
- }
-
- if (latestSwitch === best.latestSwitch) {
- if (!best.trip) {
- usurp();
- } else {
- if (best.length === undefined) {
- best.length = self.getDestinationIds(
- best.trip.destinations).length;
- }
- var length = self.getDestinationIds(trip.destinations).length;
- if (length > best.length) {
- usurp();
- best.length = length;
- } else if (length === best.length && id < best.id) {
- usurp();
- best.length = length;
- }
- }
- } else if (latestSwitch && best.latestSwitch === undefined ||
- latestSwitch > best.latestSwitch) {
- usurp();
- }
- });
-
- return best.id;
- },
-
- isNascent: function(trip) {
- return !trip.destinations ||
- this.getDestinationIds(trip.destinations).length <= 1;
- },
-
- manageTripSyncGroups: function(trips) {
- var self = this;
-
- //TODO(rosswang): maybe make this more intelligent, and handle ejection
- if (trips) {
- $.each(trips, function(tripId, trip) {
- /* Join is idempotent, but repeatedly joining might be causing major,
- * fatal sluggishness. TODO(rosswang): if this is not the case, maybe
- * go ahead and poll. */
- if (!self.joinedTrips.has(tripId) && trip.owner) {
- self.joinedTrips.add(tripId);
- self.joinTripSyncGroup(trip.owner, tripId).catch(self.onError);
- }
- });
- }
- },
-
- processTrips: function(userTripMetadata, trips) {
- var self = this;
-
- this.manageTripSyncGroups(trips);
-
- var trip;
-
- if (this.awaitedTripId) {
- this.setActiveTripId(this.awaitedTripId);
- delete this.awaitedTripId;
-
- /* Override latestSwitch this frame. (Subsequently syncbase will be up
- * to date.) */
- if (!userTripMetadata) {
- userTripMetadata = {};
- }
- var activeTripMd = userTripMetadata[this.activeTripId];
- if (!activeTripMd) {
- activeTripMd = userTripMetadata[this.activeTripId] = {};
- }
- activeTripMd.latestSwitch = Date.now();
- }
-
- if (this.activeTripId) {
- trip = trips && trips[this.activeTripId];
- if (!trip) {
- debug.log('Last active trip ' + this.activeTripId +
- ' is no longer present.');
- } else {
- var defaultId = this.getDefaultTrip(userTripMetadata, trips);
- if (defaultId && defaultId !== this.activeTripId &&
- trips[defaultId]) {
- if (this.isNascent(trip)) {
- this.deleteTrip(this.activeTripId);
- debug.log('Replacing nascent trip ' + this.activeTripId +
- ' with established trip ' + defaultId);
- } else {
- /* TODO(rosswang): for now, sync trip changes. This behavior may
- * change. */
- debug.log('Replacing active trip ' + this.activeTripId +
- ' with most recent selection ' + defaultId);
- }
-
- this.activeTripId = defaultId;
- trip = trips[defaultId];
- }
- }
- }
-
- if (!trip) {
- if (trips) {
- this.activeTripId = this.getDefaultTrip(userTripMetadata, trips);
- debug.log('Setting active trip ' + this.activeTripId);
- trip = trips[this.activeTripId];
- } else {
- var tripId = this.activeTripId = uuid.v4();
- debug.log('Creating new trip ' + tripId);
- trip = {}; //don't initialize owner until the syncgroup is ready
- this.startSyncgroupManager.then(function(gm) {
- return self.createTripSyncGroup(gm, tripId)
- .then(function(sg) {
- return gm.syncbaseWrapper.put(['trips', tripId, 'owner'],
- self.invitationManager.getUsername()).then(function() {
- return sg;
- });
- })
- .catch(self.onError);
- });
- }
- }
-
- this.activeTripOwner = trip.owner;
- this.processMessages(trip.messages);
- this.processDestinations(trip.destinations);
- },
-
processUpdates: function(data) {
- this.processTrips(data.user && data.user.tripMetadata, data.trips);
- },
+ this.tripManager.processTrips(data.user && data.user.tripMetadata,
+ data.trips);
- hasValidUpstream: function() {
- return this.upstreamId && this.upstreamId === this.activeTripId;
- },
+ this.messageSync.processMessages(this.tripManager.getMessageData());
+ this.destinationSync.processDestinations(
+ this.tripManager.getDestinationData());
- setUpstream: function() {
- this.upstreamId = this.activeTripId;
+ this.tripManager.setUpstream();
},
serve: function(args) {
@@ -641,11 +125,6 @@
self.status.usersSyncGroup = 'failed';
throw err;
});
- },
-
- createTripSyncGroup: function(groupManager, tripId) {
- return groupManager.createSyncGroup('trip-' + tripId,
- [['trips', tripId]]);
}
},
@@ -683,14 +162,10 @@
init: function(prereqs, mapsDependencies, syncbaseName) {
var self = this;
- this.mapsDeps = mapsDependencies;
this.syncbaseName = syncbaseName;
this.tripStatus = {};
- this.messages = {};
- this.destRecords = [];
this.status = {};
- this.joinedTrips = new Set();
this.server = new vdlTravel.TravelSync();
var startRpc = prereqs.then(this.serve);
@@ -708,6 +183,18 @@
var createPrimarySyncGroup = this.startSyncgroupManager
.then(this.createPrimarySyncGroup);
+ var usernamePromise = prereqs.then(function(args) {
+ return args.identity.username;
+ });
+
+ this.tripManager = new TripManager(
+ usernamePromise, sbw, this.startSyncgroupManager);
+ this.messageSync = new MessageSync(sbw, this.tripManager);
+ this.destinationSync = new DestinationSync(
+ mapsDependencies, sbw, this.tripManager);
+
+ this.messageSync.onMessages.add(this.onMessages);
+
this.startup = Promise.all([
startRpc,
startSyncbase,
@@ -721,13 +208,9 @@
};
});
- this.invitationManager = new InvitationManager(prereqs,
+ this.invitationManager = new InvitationManager(usernamePromise,
this.startSyncgroupManager);
this.invitationManager.onError.add(this.onError);
-
- this.handleDestinationPlaceChange = function() {
- self.updateDestinationPlace(this);
- };
}
});