Integrating messages with SB watch
Change-ID: If16e82c3f7d3ad8544ae18f35cca34e25dbdd257
diff --git a/README.md b/README.md
index 05d2ce0..21413e2 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@
make
-It is possible to have the build happen automatically anytime a JavaScript file
+It is possible to have the build happen automatically any time a JavaScript file
changes using the watch tool:
watch make
diff --git a/src/sync-util/message-sync.js b/src/sync-util/message-sync.js
index 4053819..670b0e3 100644
--- a/src/sync-util/message-sync.js
+++ b/src/sync-util/message-sync.js
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+require('es6-shim');
+
var $ = require('../util/jquery');
var defineClass = require('../util/define-class');
@@ -9,6 +11,10 @@
var marshalling = require('./marshalling');
+function cmp(a, b) {
+ return a.timestamp - b.timestamp;
+}
+
var MessageSync = defineClass({
publics: {
message: function(messageContent) {
@@ -19,40 +25,60 @@
var value = marshalling.marshal(payload);
this.sbw.put(this.tripManager.getMessagesKey(id), value);
- },
-
- processMessages: function(messageData) {
- var self = this;
-
- if (messageData) {
- /* Dispatch new messages in time order, though don't put them before
- * local messages. */
- var newMessages = [];
- $.each(messageData, function(id, serializedMessage) {
- if (!self.messages[id]) {
- var message = marshalling.unmarshal(serializedMessage);
- newMessages.push(message);
- self.messages[id] = message;
- }
- });
- newMessages.sort(function(a, b) {
- return a.timestamp < b.timestamp? -1 :
- a.timestamp > b.timestamp? 1 :
- 0;
- });
-
- this.onMessages(newMessages);
- }
}
},
- events: [ 'onMessages' ],
+ privates: {
+ processMessage: function(k, v) {
+ var id = k[k.length - 1];
+ if (!this.messageIds.has(id)) {
+ this.messageIds.add(id);
+ this.messageBatch.push(marshalling.unmarshal(v));
+ }
+ },
+
+ endBatch: function() {
+ this.messageBatch.sort(cmp);
+ this.onMessages(this.messageBatch);
+ this.messageBatch = [];
+ },
+
+ setPrefix: function(prefix) {
+ var self = this;
+
+ return this.sbw.getRawWatched(prefix, {
+ onData: this.processMessage,
+ onError: this.onError
+ }, {
+ onPut: this.processMessage,
+ onBatchEnd: this.endBatch,
+ onError: this.onError,
+ onClose: function(err) {
+ if (err) {
+ self.onError(err);
+ }
+ }
+ }).then(this.endBatch, this.onError);
+ },
+
+ refresh: function() {
+ return this.setPrefix(this.tripManager.getMessagesKey());
+ }
+ },
+
+ events: {
+ onMessages: '',
+ onError: 'memory'
+ },
init: function(deferredSyncbaseWrapper, tripManager) {
this.sbw = deferredSyncbaseWrapper;
this.tripManager = tripManager;
- this.messages = {};
+ this.messageIds = new Set();
+ this.messageBatch = [];
+
+ tripManager.onTripChange.add(this.refresh);
}
});
diff --git a/src/sync-util/trip-manager.js b/src/sync-util/trip-manager.js
index 3549b86..e8b7a22 100644
--- a/src/sync-util/trip-manager.js
+++ b/src/sync-util/trip-manager.js
@@ -28,10 +28,6 @@
this.awaitedTripId = tripId;
},
- getMessageData: function() {
- return this.activeTrip && this.activeTrip.messages;
- },
-
getDestinationData: function() {
return this.activeTrip && this.activeTrip.destinations;
},
@@ -78,7 +74,10 @@
* pushing from local.
*/
setUpstream: function() {
- this.upstreamTripId = this.activeTripId;
+ if (this.upstreamTripId !== this.activeTripId) {
+ this.upstreamTripId = this.activeTripId;
+ this.onTripChange(this.upstreamTripId);
+ }
},
processTrips: function(userTripMetadata, trips) {
@@ -186,6 +185,13 @@
}
},
+ events: [
+ /**
+ * @param tripId
+ */
+ 'onTripChange'
+ ],
+
init: function(usernamePromise, deferredSyncbaseWrapper,
startSyncgroupManager) {
this.usernamePromise = usernamePromise;
diff --git a/src/travelsync.js b/src/travelsync.js
index 69faa90..a7f04f9 100644
--- a/src/travelsync.js
+++ b/src/travelsync.js
@@ -98,7 +98,6 @@
this.tripManager.processTrips(data.user && data.user.tripMetadata,
data.trips);
- this.messageSync.processMessages(this.tripManager.getMessageData());
this.destinationSync.processDestinations(
this.tripManager.getDestinationData());
@@ -266,6 +265,7 @@
mapsDependencies, sbw, this.tripManager);
this.messageSync.onMessages.add(this.onMessages);
+ this.messageSync.onError.add(this.onError);
this.deviceSync = new DeviceSync(mapsDependencies.maps,
prereqs.then(function(args) { return args.identity; }),