js/core: shared type stream for server messages
MultiPart: 1/2
Change-Id: I3ed6ea1ddaa3c8a5a98bec473ceb6daaa870a150
diff --git a/src/proxy/index.js b/src/proxy/index.js
index b87c0d0..5fe0fca 100644
--- a/src/proxy/index.js
+++ b/src/proxy/index.js
@@ -69,6 +69,9 @@
}, function() {
return proxy._messageReader.nextMessageType(proxy.typeDecoder)
.then(function(typeId) {
+ if (typeId === null) {
+ return proxy.cleanup();
+ }
vlog.logger.error('Unexpected type id ' + typeId);
}).catch(function(err) {
vlog.logger.error('Type decoder failed' + err + ': ' + err.stack);
@@ -148,6 +151,11 @@
proxy._messageReader.addBytes(message);
return;
}
+ // The handler could have been added after we did the lookup but before
+ // this decode ran.
+ if (!handler) {
+ handler = proxy.outstandingRequests[id].handler;
+ }
if (!handler) {
handler = proxy.incomingRequestHandlers[messageType];
if (!handler) {
diff --git a/src/proxy/nacl.js b/src/proxy/nacl.js
index c144b27..ebaeb98 100644
--- a/src/proxy/nacl.js
+++ b/src/proxy/nacl.js
@@ -29,7 +29,9 @@
this._tasks = new TaskSequence();
this.onBrowsprMsg = function(msg) {
- self.process(msg.body);
+ if (self.instanceId === msg.instanceId) {
+ self.process(msg.body);
+ }
};
extensionEventProxy.on('browsprMsg', this.onBrowsprMsg);
diff --git a/src/proxy/stream.js b/src/proxy/stream.js
index 0c39ddc..81fa654 100644
--- a/src/proxy/stream.js
+++ b/src/proxy/stream.js
@@ -83,7 +83,7 @@
results: results,
err: err || null,
traceResponse: traceResponse
- }))
+ }), undefined, this._typeEncoder)
};
Duplex.prototype.write.call(this, object);
};
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index f893fa6..4fae75a 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -111,14 +111,15 @@
err: new verror.InternalError(this._rootCtx, 'Failed to decode ', e)
});
- this._proxy.sendRequest(hexVom.encode(authReply),
+ this._proxy.sendRequest(hexVom.encode(authReply, undefined,
+ this._typeEncoder),
Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
return;
}
var router = this;
var decodedRequest;
- vom.decode(request).catch(function(e) {
+ vom.decode(request, false, this._typeDecoder).catch(function(e) {
return Promise.reject(new verror.InternalError(router._rootCtx,
'Failed to decode ', e));
}).then(function(req) {
@@ -131,7 +132,8 @@
// TODO(bjornick): Use the real context
err: new verror.ExistsError(ctx, 'unknown server')
});
- router._proxy.sendRequest(hexVom.encode(authReply),
+ var bytes = hexVom.encode(authReply, undefined, router._typeEncoder);
+ router._proxy.sendRequest(bytes,
Outgoing.AUTHORIZATION_RESPONSE,
null, messageId);
return;
@@ -142,14 +144,16 @@
});
}).then(function() {
var authReply = new AuthReply({});
- router._proxy.sendRequest(hexVom.encode(authReply),
+ router._proxy.sendRequest(hexVom.encode(authReply, undefined,
+ router._typeEncoder),
Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
}).catch(function(e) {
var errMsg = {
- err: ErrorConversion.fromNativeValue(e, this._appName,
+ err: ErrorConversion.fromNativeValue(e, router._appName,
decodedRequest.call.method)
};
- router._proxy.sendRequest(hexVom.encode(errMsg),
+ router._proxy.sendRequest(hexVom.encode(errMsg, undefined,
+ router._typeEncoder),
Outgoing.AUTHORIZATION_RESPONSE, null,
messageId);
});
@@ -193,11 +197,12 @@
var response = new CaveatValidationResponse({
results: results
});
- var data = hexVom.encode(response);
+ var data = hexVom.encode(response, undefined, router._typeEncoder);
router._proxy.sendRequest(data, Outgoing.CAVEAT_VALIDATION_RESPONSE, null,
messageId);
});
}).catch(function(err) {
+ vlog.logger.error('Got err ' + err + ': ' + err.stack);
throw new Error('Unexpected error (all promises should resolve): ' + err);
});
};
@@ -210,7 +215,8 @@
var reply = new LookupReply({
err: new verror.NoExistError(this._rootCtx, 'unknown server')
});
- this._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+ this._proxy.sendRequest(hexVom.encode(reply, undefined, this._typeEncoder),
+ Outgoing.LOOKUP_RESPONSE,
null, messageId);
return;
}
@@ -226,13 +232,15 @@
hasAuthorizer: hasAuthorizer,
hasGlobber: hasGlobber
});
- self._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+ self._proxy.sendRequest(hexVom.encode(reply, undefined, self._typeEncoder),
+ Outgoing.LOOKUP_RESPONSE,
null, messageId);
}).catch(function(err) {
var reply = new LookupReply({
err: ErrorConversion.fromNativeValue(err, self._appName, '__Signature')
});
- self._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+ self._proxy.sendRequest(hexVom.encode(reply, undefined, self._typeEncoder),
+ Outgoing.LOOKUP_RESPONSE,
null, messageId);
});
};
@@ -447,7 +455,7 @@
if (err instanceof Error && err.stack !== undefined) {
stackTrace = err.stack;
}
- vlog.logger.debug('Requested method ' + methodName +
+ vlog.logger.error('Requested method ' + methodName +
' threw an exception on invoke: ', err, stackTrace);
// The error case has no results; only send the error.
@@ -480,9 +488,11 @@
this.sendResult(messageId, '', null, err);
return;
}
- return vom.decode(request).then(function(request) {
+ return vom.decode(request, false, this._typeDecoder)
+ .then(function(request) {
return router._handleRPCRequestInternal(messageId, request);
}, function(e) {
+ vlog.logger.error('Failed to decode args : ' + e + ': ' + e.stack);
err = new Error('Failed to decode args: ' + e);
router.sendResult(messageId, '', null, err);
});
@@ -702,7 +712,9 @@
err: errorStruct,
traceResponse: traceResponse
});
- this._proxy.sendRequest(hexVom.encode(responseData), Outgoing.RESPONSE,
+ this._proxy.sendRequest(hexVom.encode(responseData, undefined,
+ this._typeEncoder),
+ Outgoing.RESPONSE,
null, messageId);
}
};