js/core: shared type stream for server messages

MultiPart: 1/2
Change-Id: I3ed6ea1ddaa3c8a5a98bec473ceb6daaa870a150
diff --git a/src/proxy/index.js b/src/proxy/index.js
index b87c0d0..5fe0fca 100644
--- a/src/proxy/index.js
+++ b/src/proxy/index.js
@@ -69,6 +69,9 @@
   }, function() {
     return proxy._messageReader.nextMessageType(proxy.typeDecoder)
     .then(function(typeId) {
+      if (typeId === null) {
+        return proxy.cleanup();
+      }
       vlog.logger.error('Unexpected type id ' + typeId);
     }).catch(function(err) {
       vlog.logger.error('Type decoder failed' + err + ': ' + err.stack);
@@ -148,6 +151,11 @@
       proxy._messageReader.addBytes(message);
       return;
     }
+    // The handler could have been added after we did the lookup but before
+    // this decode ran.
+    if (!handler) {
+      handler = proxy.outstandingRequests[id].handler;
+    }
     if (!handler) {
       handler = proxy.incomingRequestHandlers[messageType];
       if (!handler) {
diff --git a/src/proxy/nacl.js b/src/proxy/nacl.js
index c144b27..ebaeb98 100644
--- a/src/proxy/nacl.js
+++ b/src/proxy/nacl.js
@@ -29,7 +29,9 @@
   this._tasks = new TaskSequence();
 
   this.onBrowsprMsg = function(msg) {
-    self.process(msg.body);
+    if (self.instanceId === msg.instanceId) {
+      self.process(msg.body);
+    }
   };
 
   extensionEventProxy.on('browsprMsg', this.onBrowsprMsg);
diff --git a/src/proxy/stream.js b/src/proxy/stream.js
index 0c39ddc..81fa654 100644
--- a/src/proxy/stream.js
+++ b/src/proxy/stream.js
@@ -83,7 +83,7 @@
       results: results,
       err: err || null,
       traceResponse: traceResponse
-    }))
+    }), undefined, this._typeEncoder)
   };
   Duplex.prototype.write.call(this, object);
 };
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index f893fa6..4fae75a 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -111,14 +111,15 @@
       err: new verror.InternalError(this._rootCtx, 'Failed to decode ', e)
     });
 
-    this._proxy.sendRequest(hexVom.encode(authReply),
+    this._proxy.sendRequest(hexVom.encode(authReply, undefined,
+                                          this._typeEncoder),
                             Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
     return;
   }
 
   var router = this;
   var decodedRequest;
-  vom.decode(request).catch(function(e) {
+  vom.decode(request, false, this._typeDecoder).catch(function(e) {
     return Promise.reject(new verror.InternalError(router._rootCtx,
       'Failed to decode ', e));
   }).then(function(req) {
@@ -131,7 +132,8 @@
         // TODO(bjornick): Use the real context
         err: new verror.ExistsError(ctx, 'unknown server')
       });
-      router._proxy.sendRequest(hexVom.encode(authReply),
+      var bytes = hexVom.encode(authReply, undefined, router._typeEncoder);
+      router._proxy.sendRequest(bytes,
                                 Outgoing.AUTHORIZATION_RESPONSE,
                                 null, messageId);
       return;
@@ -142,14 +144,16 @@
     });
   }).then(function() {
     var authReply = new AuthReply({});
-    router._proxy.sendRequest(hexVom.encode(authReply),
+    router._proxy.sendRequest(hexVom.encode(authReply, undefined,
+                                            router._typeEncoder),
                               Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
   }).catch(function(e) {
     var errMsg = {
-      err: ErrorConversion.fromNativeValue(e, this._appName,
+      err: ErrorConversion.fromNativeValue(e, router._appName,
                                            decodedRequest.call.method)
     };
-    router._proxy.sendRequest(hexVom.encode(errMsg),
+    router._proxy.sendRequest(hexVom.encode(errMsg, undefined,
+                                            router._typeEncoder),
                               Outgoing.AUTHORIZATION_RESPONSE, null,
                               messageId);
   });
@@ -193,11 +197,12 @@
       var response = new CaveatValidationResponse({
         results: results
       });
-      var data = hexVom.encode(response);
+      var data = hexVom.encode(response, undefined, router._typeEncoder);
       router._proxy.sendRequest(data, Outgoing.CAVEAT_VALIDATION_RESPONSE, null,
         messageId);
     });
   }).catch(function(err) {
+    vlog.logger.error('Got err ' + err + ': ' + err.stack);
     throw new Error('Unexpected error (all promises should resolve): ' + err);
   });
 };
@@ -210,7 +215,8 @@
     var reply = new LookupReply({
       err: new verror.NoExistError(this._rootCtx, 'unknown server')
     });
-    this._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+    this._proxy.sendRequest(hexVom.encode(reply, undefined, this._typeEncoder),
+                            Outgoing.LOOKUP_RESPONSE,
                             null, messageId);
     return;
   }
@@ -226,13 +232,15 @@
      hasAuthorizer: hasAuthorizer,
      hasGlobber: hasGlobber
    });
-   self._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+   self._proxy.sendRequest(hexVom.encode(reply, undefined, self._typeEncoder),
+                           Outgoing.LOOKUP_RESPONSE,
                            null, messageId);
  }).catch(function(err) {
    var reply = new LookupReply({
      err: ErrorConversion.fromNativeValue(err, self._appName, '__Signature')
    });
-   self._proxy.sendRequest(hexVom.encode(reply), Outgoing.LOOKUP_RESPONSE,
+   self._proxy.sendRequest(hexVom.encode(reply, undefined, self._typeEncoder),
+                           Outgoing.LOOKUP_RESPONSE,
                            null, messageId);
  });
 };
@@ -447,7 +455,7 @@
       if (err instanceof Error && err.stack !== undefined) {
         stackTrace = err.stack;
       }
-      vlog.logger.debug('Requested method ' + methodName +
+      vlog.logger.error('Requested method ' + methodName +
           ' threw an exception on invoke: ', err, stackTrace);
 
       // The error case has no results; only send the error.
@@ -480,9 +488,11 @@
     this.sendResult(messageId, '', null, err);
     return;
   }
-  return vom.decode(request).then(function(request) {
+  return vom.decode(request, false, this._typeDecoder)
+  .then(function(request) {
     return router._handleRPCRequestInternal(messageId, request);
   }, function(e) {
+    vlog.logger.error('Failed to decode args : ' + e + ': ' + e.stack);
     err = new Error('Failed to decode args: ' + e);
     router.sendResult(messageId, '', null, err);
   });
@@ -702,7 +712,9 @@
       err: errorStruct,
       traceResponse: traceResponse
     });
-    this._proxy.sendRequest(hexVom.encode(responseData), Outgoing.RESPONSE,
+    this._proxy.sendRequest(hexVom.encode(responseData, undefined,
+                                          this._typeEncoder),
+                            Outgoing.RESPONSE,
                             null, messageId);
   }
 };