Factoring out facets of TravelSync

Change-Id: I9319ce7dbd23acddd1279e1b527dac13e7c2b20e
diff --git a/src/invitation-manager.js b/src/invitation-manager.js
index fc98c76..86399db 100644
--- a/src/invitation-manager.js
+++ b/src/invitation-manager.js
@@ -150,13 +150,9 @@
     onError: 'memory'
   },
 
-  /**
-   * @param prereqs promise of { identity, mountNames, vanadiumWrapper }
-   */
-  init: function(prereqs, groupManagerPromise) {
+  init: function(usernamePromise, groupManagerPromise) {
     var self = this;
 
-    this.prereqs = prereqs;
     this.syncbasePromise = groupManagerPromise.then(function(gm) {
       gm.syncbaseWrapper.onUpdate.add(self.processUpdates);
       return gm.syncbaseWrapper;
@@ -165,9 +161,9 @@
 
     this.invitations = {};
 
-    prereqs.then(function(args) {
+    usernamePromise.then(function(username) {
       //this will have been set prior to groupManagerPromise completing
-      self.username = args.identity.username;
+      self.username = username;
     });
 
     groupManagerPromise.then(function(gm) {
diff --git a/src/sync-util/destination-sync.js b/src/sync-util/destination-sync.js
new file mode 100644
index 0000000..72df869
--- /dev/null
+++ b/src/sync-util/destination-sync.js
@@ -0,0 +1,348 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var _ = require('lodash');
+var uuid = require('uuid');
+
+var debug = require('../debug');
+var Place = require('../place');
+
+var marshalling = require('./marshalling');
+
+var DESTINATION_SCHEMA = [ 'place' ];
+
+var DestinationSync = defineClass({
+  statics: {
+    getDestinationIds: function(destinationsData) {
+      return marshalling.readValue(destinationsData);
+    },
+  },
+
+  publics: {
+    bindDestinations: function(destinations) {
+      if (this.destinations) {
+        this.destinations.onAdd.remove(this.handleDestinationAdd);
+        this.destinations.onRemove.remove(this.handleDestinationRemove);
+      }
+
+      this.destinations = destinations;
+
+      if (destinations) {
+        destinations.onAdd.add(this.handleDestinationAdd);
+        destinations.onRemove.add(this.handleDestinationRemove);
+      }
+    },
+
+    processDestinations: function(destinationsData) {
+      var self = this;
+
+      if (!destinationsData) {
+        if (this.tripManager.hasValidUpstream()) {
+          this.truncateDestinations(0);
+        } else {
+          //first push with no remote data; push local data as authority
+          this.pushDestinations();
+        }
+
+      } else {
+        var ids;
+        try {
+          ids = this.getDestinationIds(destinationsData);
+          if (!ids) {
+            throw new TypeError('Missing destination IDs');
+          }
+        } catch(e) {
+          this.onError(e);
+          //assume it's corrupt and overwrite
+          this.pushDestinations(true);
+          return;
+        }
+
+        $.each(ids, function(i, id) {
+          /* Don't bother reordering existing destinations by ID; instead, just
+           * overwrite everything. TODO(rosswang): optimize to reorder. */
+          var record = self.destRecords[i];
+          var destination = self.destinations.get(i);
+
+          if (!record) {
+            /* Add the record invalid so that the destination add handler leaves
+             * population to this handler. */
+            record = self.destRecords[i] = self.destinationRecord();
+            destination = self.destinations.add(i);
+          }
+
+          if (record.getId() !== id) {
+            record.setId(id);
+            debug.log('Pulling destination ' + i + ':' + id);
+          }
+
+          var destinationData = destinationsData[id];
+          var newPlace = destinationData &&
+            marshalling.unmarshal(destinationData.place);
+
+          record.setPlaceData(newPlace, function(newPlace, oldPlace) {
+            debug.log('Pulled update for destination ' + i + ':' + id +
+              '.place = ' + JSON.stringify(oldPlace) + ' => ' +
+              JSON.stringify(newPlace));
+
+            if (newPlace) {
+              var cancelled = false;
+              record.cancelPlaceAsync.add(function() {
+                cancelled = true;
+              });
+
+              Place.fromObject(self.mapsDeps, newPlace)
+                .catch(function(err) {
+                  //assume it's corrupt and overwrite
+                  if (!cancelled) {
+                    self.updateDestinationPlace(destination);
+                    throw err;
+                  }
+                })
+                .then(function(place) {
+                  if (!cancelled) {
+                    destination.setPlace(place);
+                  }
+                }).catch(function(err) {
+                  self.onError(err);
+                });
+            } else {
+              destination.setPlace(null);
+            }
+          });
+        });
+
+        if (this.destRecords.length > ids.length) {
+          this.truncateDestinations(ids.length);
+        }
+      }
+    }
+  },
+
+  privates: {
+    destinationRecord: defineClass.innerClass({
+      publics: {
+        isValid: function() {
+          return this.id !== undefined;
+        },
+
+        invalidate: function() {
+          delete this.id;
+        },
+
+        getId: function() {
+          return this.id;
+        },
+
+        setId: function(id) {
+          this.id = id;
+        },
+
+        /**
+         * @param placeData the plain object representation of a `Place`.
+         * @param changedCallback a function called if the place is actually
+         *  changed, with the params newPlace, oldPlace, as the new and old
+         *  plain object places, respectively.
+         */
+        setPlaceData: function(placeData, changedCallback) {
+          var old = this.data.place;
+          if (!_.isEqual(old, placeData) && (old || placeData)) {
+            this.data.place = placeData;
+
+            this.cancelPlaceAsync();
+
+            if (changedCallback) {
+              changedCallback.call(this.ifc, placeData, old);
+            }
+          }
+        },
+
+        put: function(dao) {
+          var self = this;
+
+          if (this.isValid()) {
+            var key = this.key();
+            var fieldIdx = key.length;
+            var writes = [];
+
+            $.each(DESTINATION_SCHEMA, function() {
+              key[fieldIdx] = this;
+              var value = self.data[this];
+              writes.push(value?
+                dao.put(key, marshalling.marshal(value)) : dao.delete(key));
+            });
+            return Promise.all(writes);
+          } else {
+            return Promise.resolve();
+          }
+        },
+
+        delete: function(dao) {
+          if (this.isValid()) {
+            return dao.delete(this.key());
+          } else {
+            return Promise.resolve();
+          }
+        },
+      },
+
+      privates: {
+        key: function() {
+          return this.outer.tripManager.getDestinationsKey(this.id);
+        }
+      },
+
+      events: {
+        /**
+         * Utility event to allow asynchronous update processes to cancel if
+         * they do not finish by the time the place has been updated again.
+         */
+        cancelPlaceAsync: 'once'
+      },
+
+      init: function(place, generateId) {
+        if (generateId) {
+          this.id = uuid.v4();
+        }
+
+        this.data = {
+          place: place && place.toObject()
+        };
+      }
+    }),
+
+    handleDestinationAdd: function (destination) {
+      var self = this;
+
+      var index = destination.getIndex();
+      var record = this.destRecords[index];
+
+      if (!record || record.isValid()) {
+        var place = destination.getPlace();
+
+        record = this.destinationRecord(place, true);
+
+        debug.log('Adding destination ' + index + ':' + record.getId());
+
+        this.destRecords.splice(index, 0, record);
+
+        if (this.tripManager.hasValidUpstream()) {
+          this.sbw.batch(function(ops) {
+            return Promise.all([
+              self.putDestinationIds(ops),
+              record.put(ops)
+            ]);
+          });
+        }
+      }
+
+      destination.onPlaceChange.add(this.handleDestinationPlaceChange);
+    },
+
+    handleDestinationRemove: function(destination) {
+      var self = this;
+
+      var index = destination.getIndex();
+      var removed = this.destRecords.splice(index, 1)[0];
+      if (this.tripManager.hasValidUpstream() && removed.isValid()) {
+        debug.log('Removing destination ' + index + ':' + removed.getId());
+        this.sbw.batch(function(ops) {
+          return Promise.all([
+            self.putDestinationIds(ops),
+            removed.delete(ops)
+          ]);
+        });
+      }
+    },
+
+    updateDestinationPlace: function(destination) {
+      var self = this;
+
+      var index = destination.getIndex();
+      var record = this.destRecords[index];
+      var place = destination.getPlace();
+      var placeData = place && place.toObject();
+
+      if (record && record.isValid()) {
+        record.setPlaceData(placeData, function(placeData, oldPlace) {
+          if (self.tripManager.hasValidUpstream()) {
+            debug.log('Updating destination ' + index + ':' + this.getId() +
+              '.place = ' + JSON.stringify(oldPlace) + ' => ' +
+              JSON.stringify(placeData));
+
+            self.sbw.nonBatched(this.put);
+          }
+        });
+      }
+    },
+
+    pushDestinations: function(force) {
+      var self = this;
+
+      this.sbw.batch(function(ops) {
+        if (!self.tripManager.getActiveTripId()) {
+          if (force) {
+            self.tripManager.setActiveTripId(uuid.v4());
+          } else {
+            return;
+          }
+        }
+
+        self.tripManager.setUpstream();
+
+        var asyncs = self.destRecords.map(function(record) {
+          return record.put(ops);
+        });
+        asyncs.push(self.putDestinationIds(ops));
+        return Promise.all(asyncs);
+      });
+    },
+
+    /* A note on these operations: Syncbase client operations occur
+     * asynchronously, in response to events that can rapidly change state. As
+     * such, each write operation must first check to ensure the record it's
+     * updating for is still valid (has a defined id).
+     */
+
+    putDestinationIds: function(dao) {
+      var ids = this.destRecords
+        .filter(function(r) { return r.isValid(); })
+        .map(function(r) { return r.getId(); });
+      return dao.put(this.tripManager.getDestinationsKey(),
+        marshalling.marshal(ids));
+    },
+
+    truncateDestinations: function(targetLength) {
+      if (this.destinations.count() > targetLength) {
+        debug.log('Truncating destinations to ' + targetLength);
+      }
+
+      while (this.destinations.count() > targetLength) {
+        var last = this.destinations.count() - 1;
+        this.destRecords[last].invalidate();
+        this.destinations.remove(last);
+      }
+    }
+  },
+
+  init: function(mapsDependencies, deferredSyncbaseWrapper, tripManager) {
+    var self = this;
+
+    this.mapsDeps = mapsDependencies;
+    this.sbw = deferredSyncbaseWrapper;
+    this.tripManager = tripManager;
+    this.destRecords = [];
+
+    this.handleDestinationPlaceChange = function() {
+      self.updateDestinationPlace(this);
+    };
+  }
+});
+
+module.exports = DestinationSync;
diff --git a/src/sync-util/marshalling.js b/src/sync-util/marshalling.js
new file mode 100644
index 0000000..c877d64
--- /dev/null
+++ b/src/sync-util/marshalling.js
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+function marshal(x) {
+  return JSON.stringify(x);
+}
+
+function unmarshal(x) {
+  if (!x) {
+    return x;
+  }
+
+  if (typeof x === 'object') {
+    throw new TypeError('Unexpected persisted object ' + JSON.stringify(x));
+  }
+
+  return JSON.parse(x);
+}
+
+function readValue(entry) {
+  return unmarshal(typeof entry === 'object'? entry._ : entry);
+}
+
+module.exports = {
+  marshal: marshal,
+  unmarshal: unmarshal,
+  readValue: readValue
+};
\ No newline at end of file
diff --git a/src/sync-util/message-sync.js b/src/sync-util/message-sync.js
new file mode 100644
index 0000000..4053819
--- /dev/null
+++ b/src/sync-util/message-sync.js
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var uuid = require('uuid');
+
+var marshalling = require('./marshalling');
+
+var MessageSync = defineClass({
+  publics: {
+    message: function(messageContent) {
+      var id = uuid.v4();
+      var payload = $.extend({
+        timestamp: Date.now()
+      }, messageContent);
+      var value = marshalling.marshal(payload);
+
+      this.sbw.put(this.tripManager.getMessagesKey(id), value);
+    },
+
+    processMessages: function(messageData) {
+      var self = this;
+
+      if (messageData) {
+        /* Dispatch new messages in time order, though don't put them before
+         * local messages. */
+        var newMessages = [];
+        $.each(messageData, function(id, serializedMessage) {
+          if (!self.messages[id]) {
+            var message = marshalling.unmarshal(serializedMessage);
+            newMessages.push(message);
+            self.messages[id] = message;
+          }
+        });
+        newMessages.sort(function(a, b) {
+          return a.timestamp < b.timestamp? -1 :
+                 a.timestamp > b.timestamp?  1 :
+                                             0;
+        });
+
+        this.onMessages(newMessages);
+      }
+    }
+  },
+
+  events: [ 'onMessages' ],
+
+  init: function(deferredSyncbaseWrapper, tripManager) {
+    this.sbw = deferredSyncbaseWrapper;
+    this.tripManager = tripManager;
+
+    this.messages = {};
+  }
+});
+
+module.exports = MessageSync;
\ No newline at end of file
diff --git a/src/sync-util/trip-manager.js b/src/sync-util/trip-manager.js
new file mode 100644
index 0000000..d4d46c0
--- /dev/null
+++ b/src/sync-util/trip-manager.js
@@ -0,0 +1,259 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+require('es6-shim');
+
+var _ = require('lodash');
+var uuid = require('uuid');
+
+var $ = require('../util/jquery');
+var defineClass = require('../util/define-class');
+
+var debug = require('../debug');
+
+var DestinationSync = require('./destination-sync');
+
+var TripManager = defineClass({
+  statics: {
+    getTripLength: function(trip) {
+      return trip.destinations?
+        DestinationSync.getDestinationIds(trip.destinations).length : 0;
+    }
+  },
+
+  publics: {
+    createTripSyncGroup: function(groupManager, tripId) {
+      return groupManager.createSyncGroup('trip-' + tripId,
+        [['trips', tripId]]);
+    },
+
+    joinTripSyncGroup: function(owner, tripId) {
+      return this.startSyncgroupManager.then(function(gm) {
+        return gm.joinSyncGroup(owner, 'trip-' + tripId);
+      });
+    },
+
+    /**
+     * Sets the active trip to the given trip ID after it is available.
+     */
+    watchForTrip: function(tripId) {
+      this.awaitedTripId = tripId;
+    },
+
+    getMessageData: function() {
+      return this.activeTrip && this.activeTrip.messages;
+    },
+
+    getDestinationData: function() {
+      return this.activeTrip && this.activeTrip.destinations;
+    },
+
+    getActiveTripId: function() {
+      return this.activeTripId;
+    },
+
+    getActiveTripOwner: function() {
+      return this.activeTrip && this.activeTrip.owner;
+    },
+
+    setActiveTripId: function(tripId) {
+      var old = this.activeTripId;
+      this.activeTripId = tripId;
+      this.sbw.put(['user', 'tripMetadata', tripId, 'latestSwitch'],
+        Date.now());
+
+      if (old !== tripId) {
+        this.activeTrip = null;
+        this.activeTripOwner = null;
+      }
+    },
+
+    hasValidUpstream: function() {
+      return this.upstreamTripId && this.upstreamTripId === this.activeTripId;
+    },
+
+    getTripKey: function() {
+      return ['trips', this.upstreamTripId].concat(_.flattenDeep(arguments));
+    },
+
+    getDestinationsKey: function() {
+      return this.getTripKey('destinations', arguments);
+    },
+
+    getMessagesKey: function() {
+      return this.getTripKey('messages', arguments);
+    },
+
+    /**
+     * This should be called whenever the upstream should be considered ready to
+     * receive updates from local, i.e. after refreshing from remote or before
+     * pushing from local.
+     */
+    setUpstream: function() {
+      this.upstreamTripId = this.activeTripId;
+    },
+
+    processTrips: function(userTripMetadata, trips) {
+      var self = this;
+
+      this.manageTripSyncGroups(trips);
+
+      var trip;
+
+      if (this.awaitedTripId) {
+        this.setActiveTripId(this.awaitedTripId);
+        delete this.awaitedTripId;
+
+        /* Override latestSwitch this frame. (Subsequently syncbase will be up
+         * to date.) */
+        if (!userTripMetadata) {
+          userTripMetadata = {};
+        }
+        var activeTripMd = userTripMetadata[this.activeTripId];
+        if (!activeTripMd) {
+          activeTripMd = userTripMetadata[this.activeTripId] = {};
+        }
+        activeTripMd.latestSwitch = Date.now();
+      }
+
+      if (this.activeTripId) {
+        trip = trips && trips[this.activeTripId];
+        if (!trip) {
+          debug.log('Last active trip ' + this.activeTripId +
+            ' is no longer present.');
+        } else {
+          var defaultId = this.getDefaultTrip(userTripMetadata, trips);
+          if (defaultId && defaultId !== this.activeTripId &&
+              trips[defaultId]) {
+            if (this.isNascent(trip)) {
+              this.deleteTrip(this.activeTripId);
+              debug.log('Replacing nascent trip ' + this.activeTripId +
+                ' with established trip ' + defaultId);
+            } else {
+              /* TODO(rosswang): for now, sync trip changes. This behavior may
+               * change. */
+              debug.log('Replacing active trip ' + this.activeTripId +
+                ' with most recent selection ' + defaultId);
+            }
+
+            this.activeTripId = defaultId;
+            trip = trips[defaultId];
+          }
+        }
+      }
+
+      if (!trip) {
+        if (trips) {
+          this.activeTripId = this.getDefaultTrip(userTripMetadata, trips);
+          debug.log('Setting active trip ' + this.activeTripId);
+          trip = trips[this.activeTripId];
+        } else {
+          var tripId = this.activeTripId = uuid.v4();
+          debug.log('Creating new trip ' + tripId);
+          trip = {}; //don't initialize owner until the syncgroup is ready
+          this.startSyncgroupManager.then(function(gm) {
+            return Promise.all([
+                self.createTripSyncGroup(gm, tripId),
+                self.usernamePromise
+              ]).then(function(args) {
+                return gm.syncbaseWrapper.put(['trips', tripId, 'owner'],
+                  args[1]).then(function() {
+                    return args[0];
+                  });
+              })
+              .catch(self.onError);
+          });
+        }
+      }
+
+      this.activeTrip = trip;
+    }
+  },
+
+  privates: {
+    deleteTrip: function(tripId) {
+      this.sbw.batch(function(ops) {
+        return Promise.all([
+          ops.delete(['user', 'tripMetadata', tripId]),
+          ops.delete(['trips', tripId])
+        ]);
+      });
+    },
+
+    /**
+     * Given a mapping of trip IDs to trip info with metadata, pick the trip
+     * that the user is most likely to care about.
+     */
+    getDefaultTrip: function(userTripMetadata, trips) {
+      var self = this;
+      var best = {};
+
+      $.each(trips, function(id, trip) {
+        var md = userTripMetadata && userTripMetadata[id];
+        var latestSwitch = md && md.latestSwitch;
+
+        function usurp() {
+          best.trip = trip;
+          best.id = id;
+          best.latestSwitch = latestSwitch;
+          delete best.length;
+        }
+
+        if (latestSwitch === best.latestSwitch) {
+          if (!best.trip) {
+            usurp();
+          } else {
+            if (best.length === undefined) {
+              best.length = self.getTripLength(best.trip);
+            }
+            var length = self.getTripLength(trip);
+            if (length > best.length) {
+              usurp();
+              best.length = length;
+            } else if (length === best.length && id < best.id) {
+              usurp();
+              best.length = length;
+            }
+          }
+        } else if (latestSwitch && best.latestSwitch === undefined ||
+            latestSwitch > best.latestSwitch) {
+          usurp();
+        }
+      });
+
+      return best.id;
+    },
+
+    isNascent: function(trip) {
+      return this.getTripLength(trip) <= 1;
+    },
+
+    manageTripSyncGroups: function(trips) {
+      var self = this;
+
+      //TODO(rosswang): maybe make this more intelligent, and handle ejection
+      if (trips) {
+        $.each(trips, function(tripId, trip) {
+          /* Join is idempotent, but repeatedly joining might be causing major,
+           * fatal sluggishness. TODO(rosswang): if this is not the case, maybe
+           * go ahead and poll. */
+          if (!self.joinedTrips.has(tripId) && trip.owner) {
+            self.joinedTrips.add(tripId);
+            self.joinTripSyncGroup(trip.owner, tripId).catch(self.onError);
+          }
+        });
+      }
+    }
+  },
+
+  init: function(usernamePromise, deferredSyncbaseWrapper,
+      startSyncgroupManager) {
+    this.usernamePromise =  usernamePromise;
+    this.sbw = deferredSyncbaseWrapper;
+    this.startSyncgroupManager = startSyncgroupManager;
+    this.joinedTrips = new Set();
+  }
+});
+
+module.exports = TripManager;
\ No newline at end of file
diff --git a/src/travelsync.js b/src/travelsync.js
index f312efc..9e0d635 100644
--- a/src/travelsync.js
+++ b/src/travelsync.js
@@ -4,24 +4,20 @@
 
 require('es6-shim');
 
-var _ = require('lodash');
-var uuid = require('uuid');
 var vanadium = require('vanadium');
 
-var $ = require('./util/jquery');
 var defineClass = require('./util/define-class');
 
-var debug = require('./debug');
 var SyncgroupManager = require('./syncgroup-manager');
 var InvitationManager = require('./invitation-manager');
-var Place = require('./place');
 
 var DeferredSbWrapper = require('./sync-util/deferred-sb-wrapper');
+var DestinationSync = require('./sync-util/destination-sync');
+var MessageSync = require('./sync-util/message-sync');
+var TripManager = require('./sync-util/trip-manager');
 
 var vdlTravel = require('../ifc');
 
-var DESTINATION_SCHEMA = [ 'place' ];
-
 var TravelSync = defineClass({
   /* Schema note: although we don't support merging destination list structure
    * changes, we use indirection in the destination list so that we don't have
@@ -29,43 +25,23 @@
    * parallel destination edits. */
   publics: {
     bindDestinations: function(destinations) {
-      if (this.destinations) {
-        this.destinations.onAdd.remove(this.handleDestinationAdd);
-        this.destinations.onRemove.remove(this.handleDestinationRemove);
-      }
-
-      this.destinations = destinations;
-
-      if (destinations) {
-        destinations.onAdd.add(this.handleDestinationAdd);
-        destinations.onRemove.add(this.handleDestinationRemove);
-      }
+      this.destinationSync.bindDestinations(destinations);
     },
 
     message: function(messageContent) {
-      var self = this;
-
-      var id = uuid.v4();
-      var payload = $.extend({
-        timestamp: Date.now()
-      }, messageContent);
-      var value = this.marshal(payload);
-
-      this.sbw.put(['trips', self.upstreamId, 'messages', id], value);
+      this.messageSync.message(messageContent);
     },
 
     getActiveTripId: function() {
-      return this.activeTripId;
+      return this.tripManager.getActiveTripId();
     },
 
     getActiveTripOwner: function() {
-      return this.activeTripOwner;
+      return this.tripManager.getActiveTripOwner();
     },
 
     setActiveTripId: function(tripId) {
-      this.activeTripId = tripId;
-      this.sbw.put(['user', 'tripMetadata', tripId, 'latestSwitch'],
-        Date.now());
+      this.tripManager.setActiveTripId(tripId);
     },
 
     getData: function() {
@@ -76,516 +52,24 @@
      * Sets the active trip to the given trip ID after it is available.
      */
     watchForTrip: function(tripId) {
-      this.awaitedTripId = tripId;
+      this.tripManager.watchForTrip(tripId);
     },
 
     joinTripSyncGroup: function(owner, tripId) {
-      return this.startSyncgroupManager.then(function(gm) {
-        return gm.joinSyncGroup(owner, 'trip-' + tripId);
-      });
+      return this.tripManager.joinTripSyncGroup(owner, tripId);
     }
   },
 
   privates: {
-    destinationRecord: defineClass.innerClass({
-      publics: {
-        isValid: function() {
-          return this.id !== undefined;
-        },
-
-        invalidate: function() {
-          delete this.id;
-        },
-
-        getId: function() {
-          return this.id;
-        },
-
-        setId: function(id) {
-          this.id = id;
-        },
-
-        /**
-         * @param placeData the plain object representation of a `Place`.
-         * @param changedCallback a function called if the place is actually
-         *  changed, with the params newPlace, oldPlace, as the new and old
-         *  plain object places, respectively.
-         */
-        setPlaceData: function(placeData, changedCallback) {
-          var old = this.data.place;
-          if (!_.isEqual(old, placeData) && (old || placeData)) {
-            this.data.place = placeData;
-
-            this.cancelPlaceAsync();
-
-            if (changedCallback) {
-              changedCallback.call(this.ifc, placeData, old);
-            }
-          }
-        },
-
-        put: function(dao) {
-          var outer = this.outer;
-          var self = this;
-
-          if (this.isValid()) {
-            var key = this.key();
-            var fieldIdx = key.length;
-            var writes = [];
-
-            $.each(DESTINATION_SCHEMA, function() {
-              key[fieldIdx] = this;
-              var value = self.data[this];
-              writes.push(value?
-                dao.put(key, outer.marshal(value)) : dao.delete(key));
-            });
-            return Promise.all(writes);
-          } else {
-            return Promise.resolve();
-          }
-        },
-
-        delete: function(dao) {
-          if (this.isValid()) {
-            return dao.delete(this.key());
-          } else {
-            return Promise.resolve();
-          }
-        },
-      },
-
-      privates: {
-        key: function() {
-          return ['trips', this.outer.upstreamId, 'destinations', this.id];
-        }
-      },
-
-      events: {
-        /**
-         * Utility event to allow asynchronous update processes to cancel if
-         * they do not finish by the time the place has been updated again.
-         */
-        cancelPlaceAsync: 'once'
-      },
-
-      init: function(place, generateId) {
-        if (generateId) {
-          this.id = uuid.v4();
-        }
-
-        this.data = {
-          place: place && place.toObject()
-        };
-      }
-    }),
-
-    handleDestinationAdd: function (destination) {
-      var self = this;
-
-      var index = destination.getIndex();
-      var record = this.destRecords[index];
-
-      if (!record || record.isValid()) {
-        var place = destination.getPlace();
-
-        record = this.destinationRecord(place, true);
-
-        debug.log('Adding destination ' + index + ':' + record.getId());
-
-        this.destRecords.splice(index, 0, record);
-
-        if (this.hasValidUpstream()) {
-          this.sbw.batch(function(ops) {
-            return Promise.all([
-              self.putDestinationIds(ops),
-              record.put(ops)
-            ]);
-          });
-        }
-      }
-
-      destination.onPlaceChange.add(this.handleDestinationPlaceChange);
-    },
-
-    handleDestinationRemove: function(destination) {
-      var self = this;
-
-      var index = destination.getIndex();
-      var removed = this.destRecords.splice(index, 1)[0];
-      if (this.hasValidUpstream() && removed.isValid()) {
-        debug.log('Removing destination ' + index + ':' + removed.getId());
-        this.sbw.batch(function(ops) {
-          return Promise.all([
-            self.putDestinationIds(ops),
-            removed.delete(ops)
-          ]);
-        });
-      }
-    },
-
-    updateDestinationPlace: function(destination) {
-      var self = this;
-
-      var index = destination.getIndex();
-      var record = this.destRecords[index];
-      var place = destination.getPlace();
-      var placeData = place && place.toObject();
-
-      if (record && record.isValid()) {
-        record.setPlaceData(placeData, function(placeData, oldPlace) {
-          if (self.hasValidUpstream()) {
-            debug.log('Updating destination ' + index + ':' + this.getId() +
-              '.place = ' + JSON.stringify(oldPlace) + ' => ' +
-              JSON.stringify(placeData));
-
-            self.sbw.nonBatched(this.put);
-          }
-        });
-      }
-    },
-
-    pushDestinations: function(force) {
-      var self = this;
-
-      this.sbw.batch(function(ops) {
-        if (!self.activeTripId) {
-          if (force) {
-            self.activeTripId = uuid.v4();
-          } else {
-            return;
-          }
-        }
-
-        self.setUpstream();
-
-        var asyncs = self.destRecords.map(function(record) {
-          return record.put(ops);
-        });
-        asyncs.push(self.putDestinationIds(ops));
-        return Promise.all(asyncs);
-      });
-    },
-
-    /* A note on these operations: Syncbase client operations occur
-     * asynchronously, in response to events that can rapidly change state. As
-     * such, each write operation must first check to ensure the record it's
-     * updating for is still valid (has a defined id).
-     */
-
-    putDestinationIds: function(dao) {
-      var ids = this.destRecords
-        .filter(function(r) { return r.isValid(); })
-        .map(function(r) { return r.getId(); });
-      return dao.put(['trips', this.upstreamId, 'destinations'],
-        this.marshal(ids));
-    },
-
-    marshal: function(x) {
-      return JSON.stringify(x);
-    },
-
-    unmarshal: function(x) {
-      if (!x) {
-        return x;
-      }
-
-      if (typeof x === 'object') {
-        throw new TypeError('Unexpected persisted object ' + JSON.stringify(x));
-      }
-
-      return JSON.parse(x);
-    },
-
-    truncateDestinations: function(targetLength) {
-      if (this.destinations.count() > targetLength) {
-        debug.log('Truncating destinations to ' + targetLength);
-      }
-
-      while (this.destinations.count() > targetLength) {
-        var last = this.destinations.count() - 1;
-        this.destRecords[last].invalidate();
-        this.destinations.remove(last);
-      }
-    },
-
-    processMessages: function(messageData) {
-      var self = this;
-
-      if (messageData) {
-        /* Dispatch new messages in time order, though don't put them before
-         * local messages. */
-        var newMessages = [];
-        $.each(messageData, function(id, serializedMessage) {
-          if (!self.messages[id]) {
-            var message = self.unmarshal(serializedMessage);
-            newMessages.push(message);
-            self.messages[id] = message;
-          }
-        });
-        newMessages.sort(function(a, b) {
-          return a.timestamp < b.timestamp? -1 :
-                 a.timestamp > b.timestamp?  1 :
-                                             0;
-        });
-
-        this.onMessages(newMessages);
-      }
-    },
-
-    getDestinationIds: function(destinationsData) {
-      return this.unmarshal(typeof destinationsData === 'object'?
-        destinationsData._ : destinationsData);
-    },
-
-    processDestinations: function(destinationsData) {
-      var self = this;
-
-      if (!destinationsData) {
-        if (this.hasValidUpstream()) {
-          this.truncateDestinations(0);
-        } else {
-          //first push with no remote data; push local data as authority
-          this.pushDestinations();
-        }
-
-      } else {
-        var ids;
-        try {
-          ids = this.getDestinationIds(destinationsData);
-          if (!ids) {
-            throw new TypeError('Missing destination IDs');
-          }
-        } catch(e) {
-          this.onError(e);
-          //assume it's corrupt and overwrite
-          this.pushDestinations(true);
-          return;
-        }
-
-        $.each(ids, function(i, id) {
-          /* Don't bother reordering existing destinations by ID; instead, just
-           * overwrite everything. TODO(rosswang): optimize to reorder. */
-          var record = self.destRecords[i];
-          var destination = self.destinations.get(i);
-
-          if (!record) {
-            /* Add the record invalid so that the destination add handler leaves
-             * population to this handler. */
-            record = self.destRecords[i] = self.destinationRecord();
-            destination = self.destinations.add(i);
-          }
-
-          if (record.getId() !== id) {
-            record.setId(id);
-            debug.log('Pulling destination ' + i + ':' + id);
-          }
-
-          var destinationData = destinationsData[id];
-          var newPlace = destinationData &&
-            self.unmarshal(destinationData.place);
-
-          record.setPlaceData(newPlace, function(newPlace, oldPlace) {
-            debug.log('Pulled update for destination ' + i + ':' + id +
-              '.place = ' + JSON.stringify(oldPlace) + ' => ' +
-              JSON.stringify(newPlace));
-
-            if (newPlace) {
-              var cancelled = false;
-              record.cancelPlaceAsync.add(function() {
-                cancelled = true;
-              });
-
-              Place.fromObject(self.mapsDeps, newPlace)
-                .catch(function(err) {
-                  //assume it's corrupt and overwrite
-                  if (!cancelled) {
-                    self.updateDestinationPlace(destination);
-                    throw err;
-                  }
-                })
-                .then(function(place) {
-                  if (!cancelled) {
-                    destination.setPlace(place);
-                  }
-                }).catch(function(err) {
-                  self.onError(err);
-                });
-            } else {
-              destination.setPlace(null);
-            }
-          });
-        });
-
-        if (this.destRecords.length > ids.length) {
-          this.truncateDestinations(ids.length);
-        }
-      }
-
-      this.setUpstream();
-    },
-
-    deleteTrip: function(tripId) {
-      this.sbw.batch(function(ops) {
-        return Promise.all([
-          ops.delete(['user', 'tripMetadata', tripId]),
-          ops.delete(['trips', tripId])
-        ]);
-      });
-    },
-
-    /**
-     * Given a mapping of trip IDs to trip info with metadata, pick the trip
-     * that the user is most likely to care about.
-     */
-    getDefaultTrip: function(userTripMetadata, trips) {
-      var self = this;
-      var best = {};
-
-      $.each(trips, function(id, trip) {
-        var md = userTripMetadata && userTripMetadata[id];
-        var latestSwitch = md && md.latestSwitch;
-
-        function usurp() {
-          best.trip = trip;
-          best.id = id;
-          best.latestSwitch = latestSwitch;
-          delete best.length;
-        }
-
-        if (latestSwitch === best.latestSwitch) {
-          if (!best.trip) {
-            usurp();
-          } else {
-            if (best.length === undefined) {
-              best.length = self.getDestinationIds(
-                best.trip.destinations).length;
-            }
-            var length = self.getDestinationIds(trip.destinations).length;
-            if (length > best.length) {
-              usurp();
-              best.length = length;
-            } else if (length === best.length && id < best.id) {
-              usurp();
-              best.length = length;
-            }
-          }
-        } else if (latestSwitch && best.latestSwitch === undefined ||
-            latestSwitch > best.latestSwitch) {
-          usurp();
-        }
-      });
-
-      return best.id;
-    },
-
-    isNascent: function(trip) {
-      return !trip.destinations ||
-        this.getDestinationIds(trip.destinations).length <= 1;
-    },
-
-    manageTripSyncGroups: function(trips) {
-      var self = this;
-
-      //TODO(rosswang): maybe make this more intelligent, and handle ejection
-      if (trips) {
-        $.each(trips, function(tripId, trip) {
-          /* Join is idempotent, but repeatedly joining might be causing major,
-           * fatal sluggishness. TODO(rosswang): if this is not the case, maybe
-           * go ahead and poll. */
-          if (!self.joinedTrips.has(tripId) && trip.owner) {
-            self.joinedTrips.add(tripId);
-            self.joinTripSyncGroup(trip.owner, tripId).catch(self.onError);
-          }
-        });
-      }
-    },
-
-    processTrips: function(userTripMetadata, trips) {
-      var self = this;
-
-      this.manageTripSyncGroups(trips);
-
-      var trip;
-
-      if (this.awaitedTripId) {
-        this.setActiveTripId(this.awaitedTripId);
-        delete this.awaitedTripId;
-
-        /* Override latestSwitch this frame. (Subsequently syncbase will be up
-         * to date.) */
-        if (!userTripMetadata) {
-          userTripMetadata = {};
-        }
-        var activeTripMd = userTripMetadata[this.activeTripId];
-        if (!activeTripMd) {
-          activeTripMd = userTripMetadata[this.activeTripId] = {};
-        }
-        activeTripMd.latestSwitch = Date.now();
-      }
-
-      if (this.activeTripId) {
-        trip = trips && trips[this.activeTripId];
-        if (!trip) {
-          debug.log('Last active trip ' + this.activeTripId +
-            ' is no longer present.');
-        } else {
-          var defaultId = this.getDefaultTrip(userTripMetadata, trips);
-          if (defaultId && defaultId !== this.activeTripId &&
-              trips[defaultId]) {
-            if (this.isNascent(trip)) {
-              this.deleteTrip(this.activeTripId);
-              debug.log('Replacing nascent trip ' + this.activeTripId +
-                ' with established trip ' + defaultId);
-            } else {
-              /* TODO(rosswang): for now, sync trip changes. This behavior may
-               * change. */
-              debug.log('Replacing active trip ' + this.activeTripId +
-                ' with most recent selection ' + defaultId);
-            }
-
-            this.activeTripId = defaultId;
-            trip = trips[defaultId];
-          }
-        }
-      }
-
-      if (!trip) {
-        if (trips) {
-          this.activeTripId = this.getDefaultTrip(userTripMetadata, trips);
-          debug.log('Setting active trip ' + this.activeTripId);
-          trip = trips[this.activeTripId];
-        } else {
-          var tripId = this.activeTripId = uuid.v4();
-          debug.log('Creating new trip ' + tripId);
-          trip = {}; //don't initialize owner until the syncgroup is ready
-          this.startSyncgroupManager.then(function(gm) {
-            return self.createTripSyncGroup(gm, tripId)
-              .then(function(sg) {
-                return gm.syncbaseWrapper.put(['trips', tripId, 'owner'],
-                  self.invitationManager.getUsername()).then(function() {
-                    return sg;
-                  });
-              })
-              .catch(self.onError);
-          });
-        }
-      }
-
-      this.activeTripOwner = trip.owner;
-      this.processMessages(trip.messages);
-      this.processDestinations(trip.destinations);
-    },
-
     processUpdates: function(data) {
-      this.processTrips(data.user && data.user.tripMetadata, data.trips);
-    },
+      this.tripManager.processTrips(data.user && data.user.tripMetadata,
+        data.trips);
 
-    hasValidUpstream: function() {
-      return this.upstreamId && this.upstreamId === this.activeTripId;
-    },
+      this.messageSync.processMessages(this.tripManager.getMessageData());
+      this.destinationSync.processDestinations(
+        this.tripManager.getDestinationData());
 
-    setUpstream: function() {
-      this.upstreamId = this.activeTripId;
+      this.tripManager.setUpstream();
     },
 
     serve: function(args) {
@@ -641,11 +125,6 @@
           self.status.usersSyncGroup = 'failed';
           throw err;
         });
-    },
-
-    createTripSyncGroup: function(groupManager, tripId) {
-      return groupManager.createSyncGroup('trip-' + tripId,
-        [['trips', tripId]]);
     }
   },
 
@@ -683,14 +162,10 @@
   init: function(prereqs, mapsDependencies, syncbaseName) {
     var self = this;
 
-    this.mapsDeps = mapsDependencies;
     this.syncbaseName = syncbaseName;
 
     this.tripStatus = {};
-    this.messages = {};
-    this.destRecords = [];
     this.status = {};
-    this.joinedTrips = new Set();
 
     this.server = new vdlTravel.TravelSync();
     var startRpc = prereqs.then(this.serve);
@@ -708,6 +183,18 @@
     var createPrimarySyncGroup = this.startSyncgroupManager
       .then(this.createPrimarySyncGroup);
 
+    var usernamePromise = prereqs.then(function(args) {
+      return args.identity.username;
+    });
+
+    this.tripManager = new TripManager(
+      usernamePromise, sbw, this.startSyncgroupManager);
+    this.messageSync = new MessageSync(sbw, this.tripManager);
+    this.destinationSync = new DestinationSync(
+      mapsDependencies, sbw, this.tripManager);
+
+    this.messageSync.onMessages.add(this.onMessages);
+
     this.startup = Promise.all([
         startRpc,
         startSyncbase,
@@ -721,13 +208,9 @@
         };
       });
 
-    this.invitationManager = new InvitationManager(prereqs,
+    this.invitationManager = new InvitationManager(usernamePromise,
       this.startSyncgroupManager);
     this.invitationManager.onError.add(this.onError);
-
-    this.handleDestinationPlaceChange = function() {
-      self.updateDestinationPlace(this);
-    };
   }
 });