blob: 670b0e38a78597bfc041c4a1608ff39b5e546f46 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
require('es6-shim');
var $ = require('../util/jquery');
var defineClass = require('../util/define-class');
var uuid = require('uuid');
var marshalling = require('./marshalling');
function cmp(a, b) {
return a.timestamp - b.timestamp;
}
var MessageSync = defineClass({
publics: {
message: function(messageContent) {
var id = uuid.v4();
var payload = $.extend({
timestamp: Date.now()
}, messageContent);
var value = marshalling.marshal(payload);
this.sbw.put(this.tripManager.getMessagesKey(id), value);
}
},
privates: {
processMessage: function(k, v) {
var id = k[k.length - 1];
if (!this.messageIds.has(id)) {
this.messageIds.add(id);
this.messageBatch.push(marshalling.unmarshal(v));
}
},
endBatch: function() {
this.messageBatch.sort(cmp);
this.onMessages(this.messageBatch);
this.messageBatch = [];
},
setPrefix: function(prefix) {
var self = this;
return this.sbw.getRawWatched(prefix, {
onData: this.processMessage,
onError: this.onError
}, {
onPut: this.processMessage,
onBatchEnd: this.endBatch,
onError: this.onError,
onClose: function(err) {
if (err) {
self.onError(err);
}
}
}).then(this.endBatch, this.onError);
},
refresh: function() {
return this.setPrefix(this.tripManager.getMessagesKey());
}
},
events: {
onMessages: '',
onError: 'memory'
},
init: function(deferredSyncbaseWrapper, tripManager) {
this.sbw = deferredSyncbaseWrapper;
this.tripManager = tripManager;
this.messageIds = new Set();
this.messageBatch = [];
tripManager.onTripChange.add(this.refresh);
}
});
module.exports = MessageSync;