wspr: Revert 74e84b456ac998e5487646e8b2a565194146f1c5 with a fix for a
race. The issue was that we weren't holding the lock when we did the
w.Send in sendRPCResponse , which allowed the sends themselves to be
re-ordered. I'm not convinced that this is the only race, but I can't
get the namespace browser tests to fail locally any more.
MultiPart: 2/2
Change-Id: I91601bbf922156ff269199ef3c5254f5cfe438ff
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 812fcd6..de2ebae 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -7,8 +7,6 @@
package app
import (
- "bytes"
- "encoding/hex"
"fmt"
"io"
"reflect"
@@ -99,6 +97,14 @@
// are objects that serve requests in wspr without actually making
// an outgoing rpc call.
reservedServices map[string]rpc.Invoker
+
+ encoderLock sync.Mutex
+ clientEncoder *vom.Encoder
+ clientWriter *lib.ProxyWriter
+
+ decoderLock sync.Mutex
+ clientDecoder *vom.Decoder
+ clientReader *lib.ProxyReader
}
// NewController creates a new Controller. writerCreator will be used to create a new flow for rpcs to
@@ -205,11 +211,13 @@
OutArgs: results,
TraceResponse: vtrace.GetResponse(ctx),
}
- encoded, err := lib.VomEncode(response)
- if err != nil {
+ c.encoderLock.Lock()
+ defer c.encoderLock.Unlock()
+ if err := c.clientEncoder.Encode(response); err != nil {
w.Error(err)
return
}
+ encoded := c.clientWriter.ConsumeBuffer()
if err := w.Send(lib.ResponseFinal, encoded); err != nil {
w.Error(verror.Convert(marshallingError, ctx, err))
}
@@ -350,6 +358,10 @@
c.outstandingRequests = make(map[int32]*outstandingRequest)
c.flowMap = make(map[int32]interface{})
c.servers = make(map[uint32]*server.Server)
+ c.clientReader = lib.NewProxyReader()
+ c.clientDecoder = vom.NewDecoder(c.clientReader)
+ c.clientWriter = lib.NewProxyWriter()
+ c.clientEncoder = vom.NewEncoder(c.clientWriter)
}
// SendOnStream writes data on id's stream. The actual network write will be
@@ -459,40 +471,42 @@
func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
func (l *localCall) Security() security.Call { return l }
-func (c *Controller) handleInternalCall(ctx *context.T, invoker rpc.Invoker, msg *RpcRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
+func (c *Controller) handleInternalCall(ctx *context.T, invoker rpc.Invoker, msg *RpcRequest, w lib.ClientWriter, span vtrace.Span) {
argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
for _, argptr := range argptrs {
- if err := decoder.Decode(argptr); err != nil {
+ if err := c.clientDecoder.Decode(argptr); err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
}
- results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
- if msg.IsStreaming {
- if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
- w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
- }
- }
-
- // Convert results from []interface{} to []*vdl.Value.
- vresults := make([]*vdl.Value, len(results))
- for i, res := range results {
- vv, err := vdl.ValueFromReflect(reflect.ValueOf(res))
+ go func() {
+ results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
- vresults[i] = vv
- }
- c.sendRPCResponse(ctx, w, span, vresults)
+ if msg.IsStreaming {
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+ }
+ }
+
+ // Convert results from []interface{} to []*vdl.Value.
+ vresults := make([]*vdl.Value, len(results))
+ for i, res := range results {
+ vv, err := vdl.ValueFromReflect(reflect.ValueOf(res))
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ vresults[i] = vv
+ }
+ c.sendRPCResponse(ctx, w, span, vresults)
+ }()
}
// HandleCaveatValidationResponse handles the response to caveat validation
@@ -510,14 +524,15 @@
// HandleVeyronRequest starts a vanadium rpc and returns before the rpc has been completed.
func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
- binbytes, err := hex.DecodeString(data)
+ c.decoderLock.Lock()
+ defer c.decoderLock.Unlock()
+ err := c.clientReader.ReplaceBuffer(data)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, fmt.Errorf("Error decoding hex string %q: %v", data, err)))
return
}
- decoder := vom.NewDecoder(bytes.NewReader(binbytes))
var msg RpcRequest
- if err := decoder.Decode(&msg); err != nil {
+ if err := c.clientDecoder.Decode(&msg); err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
@@ -540,14 +555,14 @@
// If this message is for an internal service, do a short-circuit dispatch here.
if invoker, ok := c.reservedServices[msg.Name]; ok {
- go c.handleInternalCall(ctx, invoker, &msg, decoder, w, span)
+ c.handleInternalCall(ctx, invoker, &msg, w, span)
return
}
inArgs := make([]interface{}, msg.NumInArgs)
for i := range inArgs {
var v *vdl.Value
- if err := decoder.Decode(&v); err != nil {
+ if err := c.clientDecoder.Decode(&v); err != nil {
w.Error(err)
return
}