Integrating messages with SB watch

Change-ID: If16e82c3f7d3ad8544ae18f35cca34e25dbdd257
diff --git a/README.md b/README.md
index 05d2ce0..21413e2 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@
 
     make
 
-It is possible to have the build happen automatically anytime a JavaScript file
+It is possible to have the build happen automatically any time a JavaScript file
 changes using the watch tool:
 
     watch make
diff --git a/src/sync-util/message-sync.js b/src/sync-util/message-sync.js
index 4053819..670b0e3 100644
--- a/src/sync-util/message-sync.js
+++ b/src/sync-util/message-sync.js
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+require('es6-shim');
+
 var $ = require('../util/jquery');
 var defineClass = require('../util/define-class');
 
@@ -9,6 +11,10 @@
 
 var marshalling = require('./marshalling');
 
+function cmp(a, b) {
+  return a.timestamp - b.timestamp;
+}
+
 var MessageSync = defineClass({
   publics: {
     message: function(messageContent) {
@@ -19,40 +25,60 @@
       var value = marshalling.marshal(payload);
 
       this.sbw.put(this.tripManager.getMessagesKey(id), value);
-    },
-
-    processMessages: function(messageData) {
-      var self = this;
-
-      if (messageData) {
-        /* Dispatch new messages in time order, though don't put them before
-         * local messages. */
-        var newMessages = [];
-        $.each(messageData, function(id, serializedMessage) {
-          if (!self.messages[id]) {
-            var message = marshalling.unmarshal(serializedMessage);
-            newMessages.push(message);
-            self.messages[id] = message;
-          }
-        });
-        newMessages.sort(function(a, b) {
-          return a.timestamp < b.timestamp? -1 :
-                 a.timestamp > b.timestamp?  1 :
-                                             0;
-        });
-
-        this.onMessages(newMessages);
-      }
     }
   },
 
-  events: [ 'onMessages' ],
+  privates: {
+    processMessage: function(k, v) {
+      var id = k[k.length - 1];
+      if (!this.messageIds.has(id)) {
+        this.messageIds.add(id);
+        this.messageBatch.push(marshalling.unmarshal(v));
+      }
+    },
+
+    endBatch: function() {
+      this.messageBatch.sort(cmp);
+      this.onMessages(this.messageBatch);
+      this.messageBatch = [];
+    },
+
+    setPrefix: function(prefix) {
+      var self = this;
+
+      return this.sbw.getRawWatched(prefix, {
+        onData: this.processMessage,
+        onError: this.onError
+      }, {
+        onPut: this.processMessage,
+        onBatchEnd: this.endBatch,
+        onError: this.onError,
+        onClose: function(err) {
+          if (err) {
+            self.onError(err);
+          }
+        }
+      }).then(this.endBatch, this.onError);
+    },
+
+    refresh: function() {
+      return this.setPrefix(this.tripManager.getMessagesKey());
+    }
+  },
+
+  events: {
+    onMessages: '',
+    onError: 'memory'
+  },
 
   init: function(deferredSyncbaseWrapper, tripManager) {
     this.sbw = deferredSyncbaseWrapper;
     this.tripManager = tripManager;
 
-    this.messages = {};
+    this.messageIds = new Set();
+    this.messageBatch = [];
+
+    tripManager.onTripChange.add(this.refresh);
   }
 });
 
diff --git a/src/sync-util/trip-manager.js b/src/sync-util/trip-manager.js
index 3549b86..e8b7a22 100644
--- a/src/sync-util/trip-manager.js
+++ b/src/sync-util/trip-manager.js
@@ -28,10 +28,6 @@
       this.awaitedTripId = tripId;
     },
 
-    getMessageData: function() {
-      return this.activeTrip && this.activeTrip.messages;
-    },
-
     getDestinationData: function() {
       return this.activeTrip && this.activeTrip.destinations;
     },
@@ -78,7 +74,10 @@
      * pushing from local.
      */
     setUpstream: function() {
-      this.upstreamTripId = this.activeTripId;
+      if (this.upstreamTripId !== this.activeTripId) {
+        this.upstreamTripId = this.activeTripId;
+        this.onTripChange(this.upstreamTripId);
+      }
     },
 
     processTrips: function(userTripMetadata, trips) {
@@ -186,6 +185,13 @@
     }
   },
 
+  events: [
+    /**
+     * @param tripId
+     */
+    'onTripChange'
+  ],
+
   init: function(usernamePromise, deferredSyncbaseWrapper,
       startSyncgroupManager) {
     this.usernamePromise = usernamePromise;
diff --git a/src/travelsync.js b/src/travelsync.js
index 69faa90..a7f04f9 100644
--- a/src/travelsync.js
+++ b/src/travelsync.js
@@ -98,7 +98,6 @@
       this.tripManager.processTrips(data.user && data.user.tripMetadata,
         data.trips);
 
-      this.messageSync.processMessages(this.tripManager.getMessageData());
       this.destinationSync.processDestinations(
         this.tripManager.getDestinationData());
 
@@ -266,6 +265,7 @@
       mapsDependencies, sbw, this.tripManager);
 
     this.messageSync.onMessages.add(this.onMessages);
+    this.messageSync.onError.add(this.onError);
 
     this.deviceSync = new DeviceSync(mapsDependencies.maps,
         prereqs.then(function(args) { return args.identity; }),