wspr: Share an encoder and decoder for all js clients.

This improves RPC performance by 50% because the most expensive part
of making an rpc is encoding the type message.  This is really an
interim change, since the real solution is to have a separate type
flow that is shared by all messages from wspr to go.

This improves: https://github.com/vanadium/issues/issues/421

MultiPart: 2/3
Change-Id: Id9ebe77fbe04c143b2728af24003082d810d7ff4
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 05da65a..22de904 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -7,8 +7,6 @@
 package app
 
 import (
-	"bytes"
-	"encoding/hex"
 	"fmt"
 	"io"
 	"reflect"
@@ -99,6 +97,14 @@
 	// are objects that serve requests in wspr without actually making
 	// an outgoing rpc call.
 	reservedServices map[string]rpc.Invoker
+
+	encoderLock   sync.Mutex
+	clientEncoder *vom.Encoder
+	clientWriter  *lib.ProxyWriter
+
+	decoderLock   sync.Mutex
+	clientDecoder *vom.Decoder
+	clientReader  *lib.ProxyReader
 }
 
 // NewController creates a new Controller.  writerCreator will be used to create a new flow for rpcs to
@@ -205,11 +211,14 @@
 		OutArgs:       results,
 		TraceResponse: vtrace.GetResponse(ctx),
 	}
-	encoded, err := lib.VomEncode(response)
-	if err != nil {
+	c.encoderLock.Lock()
+	if err := c.clientEncoder.Encode(response); err != nil {
+		c.encoderLock.Unlock()
 		w.Error(err)
 		return
 	}
+	encoded := c.clientWriter.ConsumeBuffer()
+	c.encoderLock.Unlock()
 	if err := w.Send(lib.ResponseFinal, encoded); err != nil {
 		w.Error(verror.Convert(marshallingError, ctx, err))
 	}
@@ -350,6 +359,10 @@
 	c.outstandingRequests = make(map[int32]*outstandingRequest)
 	c.flowMap = make(map[int32]interface{})
 	c.servers = make(map[uint32]*server.Server)
+	c.clientReader = lib.NewProxyReader()
+	c.clientDecoder = vom.NewDecoder(c.clientReader)
+	c.clientWriter = lib.NewProxyWriter()
+	c.clientEncoder = vom.NewEncoder(c.clientWriter)
 }
 
 // SendOnStream writes data on id's stream.  The actual network write will be
@@ -459,40 +472,42 @@
 func (l *localCall) RemoteEndpoint() naming.Endpoint                 { return nil }
 func (l *localCall) Security() security.Call                         { return l }
 
-func (c *Controller) handleInternalCall(ctx *context.T, invoker rpc.Invoker, msg *RpcRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
+func (c *Controller) handleInternalCall(ctx *context.T, invoker rpc.Invoker, msg *RpcRequest, w lib.ClientWriter, span vtrace.Span) {
 	argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
 	if err != nil {
 		w.Error(verror.Convert(verror.ErrInternal, ctx, err))
 		return
 	}
 	for _, argptr := range argptrs {
-		if err := decoder.Decode(argptr); err != nil {
+		if err := c.clientDecoder.Decode(argptr); err != nil {
 			w.Error(verror.Convert(verror.ErrInternal, ctx, err))
 			return
 		}
 	}
-	results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
-	if err != nil {
-		w.Error(verror.Convert(verror.ErrInternal, ctx, err))
-		return
-	}
-	if msg.IsStreaming {
-		if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
-			w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
-		}
-	}
-
-	// Convert results from []interface{} to []*vdl.Value.
-	vresults := make([]*vdl.Value, len(results))
-	for i, res := range results {
-		vv, err := vdl.ValueFromReflect(reflect.ValueOf(res))
+	go func() {
+		results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
 		if err != nil {
 			w.Error(verror.Convert(verror.ErrInternal, ctx, err))
 			return
 		}
-		vresults[i] = vv
-	}
-	c.sendRPCResponse(ctx, w, span, vresults)
+		if msg.IsStreaming {
+			if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+				w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+			}
+		}
+
+		// Convert results from []interface{} to []*vdl.Value.
+		vresults := make([]*vdl.Value, len(results))
+		for i, res := range results {
+			vv, err := vdl.ValueFromReflect(reflect.ValueOf(res))
+			if err != nil {
+				w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+				return
+			}
+			vresults[i] = vv
+		}
+		c.sendRPCResponse(ctx, w, span, vresults)
+	}()
 }
 
 // HandleCaveatValidationResponse handles the response to caveat validation
@@ -510,14 +525,15 @@
 
 // HandleVeyronRequest starts a vanadium rpc and returns before the rpc has been completed.
 func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
-	binbytes, err := hex.DecodeString(data)
+	c.decoderLock.Lock()
+	defer c.decoderLock.Unlock()
+	err := c.clientReader.ReplaceBuffer(data)
 	if err != nil {
 		w.Error(verror.Convert(verror.ErrInternal, ctx, fmt.Errorf("Error decoding hex string %q: %v", data, err)))
 		return
 	}
-	decoder := vom.NewDecoder(bytes.NewReader(binbytes))
 	var msg RpcRequest
-	if err := decoder.Decode(&msg); err != nil {
+	if err := c.clientDecoder.Decode(&msg); err != nil {
 		w.Error(verror.Convert(verror.ErrInternal, ctx, err))
 		return
 	}
@@ -540,14 +556,14 @@
 
 	// If this message is for an internal service, do a short-circuit dispatch here.
 	if invoker, ok := c.reservedServices[msg.Name]; ok {
-		go c.handleInternalCall(ctx, invoker, &msg, decoder, w, span)
+		c.handleInternalCall(ctx, invoker, &msg, w, span)
 		return
 	}
 
 	inArgs := make([]interface{}, msg.NumInArgs)
 	for i := range inArgs {
 		var v *vdl.Value
-		if err := decoder.Decode(&v); err != nil {
+		if err := c.clientDecoder.Decode(&v); err != nil {
 			w.Error(err)
 			return
 		}
diff --git a/services/wspr/internal/browspr/writer.go b/services/wspr/internal/browspr/writer.go
index 17222f1..ed4fc66 100644
--- a/services/wspr/internal/browspr/writer.go
+++ b/services/wspr/internal/browspr/writer.go
@@ -20,7 +20,6 @@
 	if err != nil {
 		return err
 	}
-
 	w.p.browspr.postMessage(w.p.instanceId, "browsprMsg", outMsg)
 	return nil
 }
diff --git a/services/wspr/internal/lib/vom.go b/services/wspr/internal/lib/vom.go
index ef1f09d..3864c4e 100644
--- a/services/wspr/internal/lib/vom.go
+++ b/services/wspr/internal/lib/vom.go
@@ -37,3 +37,39 @@
 	decoder := vom.NewDecoder(bytes.NewReader(binbytes))
 	return decoder.Decode(v)
 }
+
+// ProxyReader implements io.Reader but allows changing the underlying buffer.
+// This is useful for merging discrete messages that are part of the same flow.
+type ProxyReader struct {
+	bytes.Buffer
+}
+
+func NewProxyReader() *ProxyReader {
+	return &ProxyReader{}
+}
+
+func (p *ProxyReader) ReplaceBuffer(data string) error {
+	binbytes, err := hex.DecodeString(data)
+	if err != nil {
+		return err
+	}
+	p.Reset()
+	p.Write(binbytes)
+	return nil
+}
+
+// ProxyWriter implements io.Writer but allows changing the underlying buffer.
+// This is useful for merging discrete messages that are part of the same flow.
+type ProxyWriter struct {
+	bytes.Buffer
+}
+
+func NewProxyWriter() *ProxyWriter {
+	return &ProxyWriter{}
+}
+
+func (p *ProxyWriter) ConsumeBuffer() string {
+	s := hex.EncodeToString(p.Buffer.Bytes())
+	p.Reset()
+	return s
+}