veyron/services/wspr: Support cancellation and deadlines to and from javascript.
Change-Id: Ie6eb0ff489a04a06572500efd44f4150c97dc045
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index f58254e..139a124 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -67,6 +67,7 @@
InArgs []json.RawMessage
NumOutArgs int32
IsStreaming bool
+ Timeout int64
}
type veyronRPC struct {
@@ -98,6 +99,11 @@
Name string
}
+type outstandingRequest struct {
+ stream *outstandingStream
+ cancel context.CancelFunc
+}
+
// Controller represents all the state of a Veyron Web App. This is the struct
// that is in charge performing all the veyron options.
type Controller struct {
@@ -118,8 +124,9 @@
// by the client.
lastGeneratedId int64
- // Streams for the outstanding requests.
- outstandingStreams map[int64]*outstandingStream
+ // Used to keep track of data (streams and cancellation functions) for
+ // outstanding requests.
+ outstandingRequests map[int64]*outstandingRequest
// Maps flowids to the server that owns them.
flowMap map[int64]*server.Server
@@ -260,20 +267,22 @@
c.flowMap[id] = s
os := newStream()
os.init(stream, vom_wiretype.Type{ID: 1})
- c.outstandingStreams[id] = os
+ c.outstandingRequests[id] = &outstandingRequest{
+ stream: os,
+ }
return &server.Flow{ID: id, Writer: c.writerCreator(id)}
}
// CleanupFlow removes the bookkeping for a previously created flow.
func (c *Controller) CleanupFlow(id int64) {
c.Lock()
- stream := c.outstandingStreams[id]
- delete(c.outstandingStreams, id)
+ request := c.outstandingRequests[id]
+ delete(c.outstandingRequests, id)
delete(c.flowMap, id)
c.Unlock()
- if stream != nil {
- stream.end()
- stream.waitUntilDone()
+ if request != nil && request.stream != nil {
+ request.stream.end()
+ request.stream.waitUntilDone()
}
}
@@ -301,8 +310,13 @@
c.Lock()
defer c.Unlock()
- for _, stream := range c.outstandingStreams {
- stream.end()
+ for _, request := range c.outstandingRequests {
+ if request.cancel != nil {
+ request.cancel()
+ }
+ if request.stream != nil {
+ request.stream.end()
+ }
}
for _, server := range c.servers {
@@ -314,7 +328,7 @@
func (c *Controller) setup() {
c.signatureManager = lib.NewSignatureManager()
- c.outstandingStreams = make(map[int64]*outstandingStream)
+ c.outstandingRequests = make(map[int64]*outstandingRequest)
c.flowMap = make(map[int64]*server.Server)
c.servers = make(map[uint64]*server.Server)
}
@@ -323,14 +337,14 @@
// done asynchronously. If there is an error, it will be sent to w.
func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
c.Lock()
- stream := c.outstandingStreams[id]
+ request := c.outstandingRequests[id]
c.Unlock()
- if stream == nil {
+ if request == nil || request.stream == nil {
vlog.Errorf("unknown stream: %d", id)
return
}
- stream.send(data, w)
+ request.stream.send(data, w)
}
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
@@ -396,11 +410,14 @@
}
c.finishCall(ctx, w, call, &msg)
- if stream != nil {
- c.Lock()
- delete(c.outstandingStreams, id)
- c.Unlock()
+ c.Lock()
+ if request, ok := c.outstandingRequests[id]; ok {
+ delete(c.outstandingRequests, id)
+ if request.cancel != nil {
+ request.cancel()
+ }
}
+ c.Unlock()
}
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -411,32 +428,56 @@
return
}
- // If this rpc is streaming, we would expect that the client would try to send
- // on this stream. Since the initial handshake is done asynchronously, we have
- // to put the outstanding stream in the map before we make the async call so that
- // the future send know which queue to write to, even if the client call isn't
- // actually ready yet.
- var stream *outstandingStream
- if veyronTempMsg.IsStreaming {
- stream = newStream()
- c.Lock()
- c.outstandingStreams[id] = stream
- c.Unlock()
+ var cctx context.T
+ var cancel context.CancelFunc
+
+ // TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
+ // However as a rollout strategy we must, otherwise there is a circular
+ // dependency between the WSPR change and the JS change that will follow.
+ if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+ cctx, cancel = ctx.WithCancel()
+ } else {
+ cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
}
- go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
+ var stream *outstandingStream
+
+ request := &outstandingRequest{
+ cancel: cancel,
+ }
+ if veyronTempMsg.IsStreaming {
+ // If this rpc is streaming, we would expect that the client would try to send
+ // on this stream. Since the initial handshake is done asynchronously, we have
+ // to put the outstanding stream in the map before we make the async call so that
+ // the future send know which queue to write to, even if the client call isn't
+ // actually ready yet.
+ request.stream = newStream()
+ }
+ c.Lock()
+ c.outstandingRequests[id] = request
+ c.Unlock()
+
+ go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, stream)
+}
+
+// HandleVeyronCancellation cancels the request corresponding to the
+// given id if it is still outstanding.
+func (c *Controller) HandleVeyronCancellation(id int64) {
+ c.Lock()
+ defer c.Unlock()
+ if request, ok := c.outstandingRequests[id]; ok && request.cancel != nil {
+ request.cancel()
+ }
}
// CloseStream closes the stream for a given id.
func (c *Controller) CloseStream(id int64) {
c.Lock()
defer c.Unlock()
- stream := c.outstandingStreams[id]
- if stream == nil {
- c.logger.Errorf("close called on non-existent call: %v", id)
+ if request, ok := c.outstandingRequests[id]; ok && request.stream != nil {
+ request.stream.end()
return
}
-
- stream.end()
+ c.logger.Errorf("close called on non-existent call: %v", id)
}
func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 3297124..9425760 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -249,7 +249,9 @@
var stream *outstandingStream
if len(test.streamingInputs) > 0 {
stream = newStream()
- controller.outstandingStreams[0] = stream
+ controller.outstandingRequests[0] = &outstandingRequest{
+ stream: stream,
+ }
go func() {
for _, value := range test.streamingInputs {
controller.SendOnStream(0, value, &writer)
@@ -267,7 +269,9 @@
}
controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, stream)
- testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError, t)
+ if err := testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError); err != nil {
+ t.Error(err)
+ }
}
func TestCallingGoServer(t *testing.T) {
@@ -666,7 +670,9 @@
var result interface{}
var err2 error
err := call.Finish(&result, &err2)
- testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+ if err := testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil); err != nil {
+ t.Error(err)
+ }
// We can't do a deep equal with authError because the error returned by the
// authorizer is wrapped into another error by the ipc framework.
if err == nil {
@@ -732,8 +738,9 @@
"Handle": 0.0,
"Args": test.inArgs,
"Context": map[string]interface{}{
- "Name": "adder",
- "Suffix": "adder",
+ "Name": "adder",
+ "Suffix": "adder",
+ "Timeout": float64(lib.JSIPCNoTimeout),
"RemoteBlessings": map[string]interface{}{
"Handle": expectedBlessingsHandle,
"PublicKey": publicKey,
@@ -800,7 +807,9 @@
t.Errorf("didn't receive expected message: %v", err)
}
- testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+ if err := testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil); err != nil {
+ t.Error(err)
+ }
}
func TestSimpleJSServer(t *testing.T) {
diff --git a/services/wsprd/ipc/server/dispatcher_test.go b/services/wsprd/ipc/server/dispatcher_test.go
index 2260d8e..f07d1e1 100644
--- a/services/wsprd/ipc/server/dispatcher_test.go
+++ b/services/wsprd/ipc/server/dispatcher_test.go
@@ -107,7 +107,9 @@
},
},
}
- testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
}
func TestSuccessfulLookupWithAuthorizer(t *testing.T) {
@@ -155,7 +157,9 @@
},
},
}
- testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
}
func TestFailedLookup(t *testing.T) {
@@ -186,5 +190,7 @@
},
},
}
- testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
}
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index aca1eee..f879a26 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -5,6 +5,7 @@
import (
"encoding/json"
"sync"
+ "time"
vsecurity "veyron.io/veyron/veyron/security"
"veyron.io/wspr/veyron/services/wsprd/identity"
@@ -41,6 +42,7 @@
// TODO(ataly, ashankar, bjornick): Remove this field once the old security model is killed.
RemoteID identity.PublicIDHandle
+ Timeout int64 // The time period (in ns) between now and the deadline.
}
// The response from the javascript server to the proxy.
@@ -157,15 +159,23 @@
s.mu.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.mu.Unlock()
+
+ timeout := lib.JSIPCNoTimeout
+ if deadline, ok := call.Deadline(); ok {
+ timeout = lib.GoToJSDuration(deadline.Sub(time.Now()))
+ }
+
context := serverRPCRequestContext{
- Suffix: call.Suffix(),
- Name: call.Name(),
+ Suffix: call.Suffix(),
+ Name: call.Name(),
+ Timeout: timeout,
}
if s.helper.UseOldModel() {
context.RemoteID = s.convertPublicIDToHandle(call.RemoteID())
} else {
context.RemoteBlessings = s.convertBlessingsToHandle(call.RemoteBlessings())
}
+
// Send a invocation request to JavaScript
message := serverRPCRequest{
ServerId: s.id,
@@ -177,9 +187,10 @@
if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
// Error in marshaling, pass the error through the channel immediately
- stdErr := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
- replyChan <- &serverRPCReply{nil,
- &stdErr,
+ if ch := s.popServerRequest(flow.ID); ch != nil {
+ stdErr := verror2.Convert(verror2.Internal, call, err).(verror2.Standard)
+ ch <- &serverRPCReply{nil, &stdErr}
+ s.helper.CleanupFlow(flow.ID)
}
return replyChan
}
@@ -188,7 +199,24 @@
"JavaScript server with args %v, MessageId %d was assigned.",
methodName, args, flow.ID)
+ // Watch for cancellation.
+ go func() {
+ <-call.Done()
+ ch := s.popServerRequest(flow.ID)
+ if ch == nil {
+ return
+ }
+
+ // Send a cancel message to the JS server.
+ flow.Writer.Send(lib.ResponseCancel, nil)
+ s.helper.CleanupFlow(flow.ID)
+
+ err := verror2.Convert(verror2.Aborted, call, call.Err()).(verror2.Standard)
+ ch <- &serverRPCReply{nil, &err}
+ }()
+
go proxyStream(call, flow.Writer, s.helper.GetLogger())
+
return replyChan
}
}
@@ -280,17 +308,24 @@
return s.endpoint, nil
}
-func (s *Server) HandleServerResponse(id int64, data string) {
+func (s *Server) popServerRequest(id int64) chan *serverRPCReply {
s.mu.Lock()
+ defer s.mu.Unlock()
ch := s.outstandingServerRequests[id]
delete(s.outstandingServerRequests, id)
- s.mu.Unlock()
+
+ return ch
+}
+
+func (s *Server) HandleServerResponse(id int64, data string) {
+ ch := s.popServerRequest(id)
if ch == nil {
s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
}
+
// Decode the result and send it through the channel
var serverReply serverRPCReply
if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
diff --git a/services/wsprd/lib/testwriter/writer.go b/services/wsprd/lib/testwriter/writer.go
index 01f22ff..093445f 100644
--- a/services/wsprd/lib/testwriter/writer.go
+++ b/services/wsprd/lib/testwriter/writer.go
@@ -86,11 +86,12 @@
return nil
}
-func CheckResponses(w *Writer, wantStream []Response, wantErr error, t TestHarness) {
+func CheckResponses(w *Writer, wantStream []Response, wantErr error) error {
if got, want := w.Stream, wantStream; !reflect.DeepEqual(got, want) {
- t.Errorf("streams don't match: got %v, want %v", got, want)
+ return fmt.Errorf("streams don't match: got %v, want %v", got, want)
}
if got, want := w.err, wantErr; !verror2.Equal(got, want) {
- t.Errorf("unexpected error, got: %#v, expected: %#v", got, want)
+ return fmt.Errorf("unexpected error, got: %#v, expected: %#v", got, want)
}
+ return nil
}
diff --git a/services/wsprd/lib/time.go b/services/wsprd/lib/time.go
new file mode 100644
index 0000000..c9f89d3
--- /dev/null
+++ b/services/wsprd/lib/time.go
@@ -0,0 +1,23 @@
+package lib
+
+import (
+ "time"
+
+ "veyron.io/veyron/veyron2/ipc"
+)
+
+const nanosecondsPerMillisecond = 1000000
+
+// Javascript uses millisecond time units, not nanoseconds.
+// This is both because they are the native time unit, and because
+// otherwise JS numbers would not be capable of representing the
+// full time range available to Go programs.
+const JSIPCNoTimeout = ipc.NoTimeout / nanosecondsPerMillisecond
+
+func GoToJSDuration(d time.Duration) int64 {
+ return int64(d) / nanosecondsPerMillisecond
+}
+
+func JSToGoDuration(d int64) time.Duration {
+ return time.Duration(d * nanosecondsPerMillisecond)
+}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
index 7c6c18f..61a2cd5 100644
--- a/services/wsprd/lib/writer.go
+++ b/services/wsprd/lib/writer.go
@@ -10,6 +10,7 @@
ResponseStreamClose = 4
ResponseDispatcherLookup = 5
ResponseAuthRequest = 6
+ ResponseCancel = 7
)
// This is basically an io.Writer interface, that allows passing error message
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
index feffa05..371fbfd 100644
--- a/services/wsprd/wspr/pipe.go
+++ b/services/wsprd/wspr/pipe.go
@@ -80,6 +80,9 @@
// A request to create a new random blessings
websocketCreateBlessings = 16
+
+ // A request to cancel an rpc initiated by the JS.
+ websocketCancel = 17
)
type websocketMessage struct {
@@ -300,6 +303,8 @@
switch msg.Type {
case websocketVeyronRequest:
p.controller.HandleVeyronRequest(ctx, msg.Id, msg.Data, ww)
+ case websocketCancel:
+ go p.controller.HandleVeyronCancellation(msg.Id)
case websocketStreamingValue:
// SendOnStream queues up the message to be sent, but doesn't do the send
// on this goroutine. We need to queue the messages synchronously so that