veyron/services/wspr: Support cancellation and deadlines to and from javascript.
Change-Id: Ie6eb0ff489a04a06572500efd44f4150c97dc045
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index f58254e..139a124 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -67,6 +67,7 @@
InArgs []json.RawMessage
NumOutArgs int32
IsStreaming bool
+ Timeout int64
}
type veyronRPC struct {
@@ -98,6 +99,11 @@
Name string
}
+type outstandingRequest struct {
+ stream *outstandingStream
+ cancel context.CancelFunc
+}
+
// Controller represents all the state of a Veyron Web App. This is the struct
// that is in charge performing all the veyron options.
type Controller struct {
@@ -118,8 +124,9 @@
// by the client.
lastGeneratedId int64
- // Streams for the outstanding requests.
- outstandingStreams map[int64]*outstandingStream
+ // Used to keep track of data (streams and cancellation functions) for
+ // outstanding requests.
+ outstandingRequests map[int64]*outstandingRequest
// Maps flowids to the server that owns them.
flowMap map[int64]*server.Server
@@ -260,20 +267,22 @@
c.flowMap[id] = s
os := newStream()
os.init(stream, vom_wiretype.Type{ID: 1})
- c.outstandingStreams[id] = os
+ c.outstandingRequests[id] = &outstandingRequest{
+ stream: os,
+ }
return &server.Flow{ID: id, Writer: c.writerCreator(id)}
}
// CleanupFlow removes the bookkeping for a previously created flow.
func (c *Controller) CleanupFlow(id int64) {
c.Lock()
- stream := c.outstandingStreams[id]
- delete(c.outstandingStreams, id)
+ request := c.outstandingRequests[id]
+ delete(c.outstandingRequests, id)
delete(c.flowMap, id)
c.Unlock()
- if stream != nil {
- stream.end()
- stream.waitUntilDone()
+ if request != nil && request.stream != nil {
+ request.stream.end()
+ request.stream.waitUntilDone()
}
}
@@ -301,8 +310,13 @@
c.Lock()
defer c.Unlock()
- for _, stream := range c.outstandingStreams {
- stream.end()
+ for _, request := range c.outstandingRequests {
+ if request.cancel != nil {
+ request.cancel()
+ }
+ if request.stream != nil {
+ request.stream.end()
+ }
}
for _, server := range c.servers {
@@ -314,7 +328,7 @@
func (c *Controller) setup() {
c.signatureManager = lib.NewSignatureManager()
- c.outstandingStreams = make(map[int64]*outstandingStream)
+ c.outstandingRequests = make(map[int64]*outstandingRequest)
c.flowMap = make(map[int64]*server.Server)
c.servers = make(map[uint64]*server.Server)
}
@@ -323,14 +337,14 @@
// done asynchronously. If there is an error, it will be sent to w.
func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
c.Lock()
- stream := c.outstandingStreams[id]
+ request := c.outstandingRequests[id]
c.Unlock()
- if stream == nil {
+ if request == nil || request.stream == nil {
vlog.Errorf("unknown stream: %d", id)
return
}
- stream.send(data, w)
+ request.stream.send(data, w)
}
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
@@ -396,11 +410,14 @@
}
c.finishCall(ctx, w, call, &msg)
- if stream != nil {
- c.Lock()
- delete(c.outstandingStreams, id)
- c.Unlock()
+ c.Lock()
+ if request, ok := c.outstandingRequests[id]; ok {
+ delete(c.outstandingRequests, id)
+ if request.cancel != nil {
+ request.cancel()
+ }
}
+ c.Unlock()
}
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -411,32 +428,56 @@
return
}
- // If this rpc is streaming, we would expect that the client would try to send
- // on this stream. Since the initial handshake is done asynchronously, we have
- // to put the outstanding stream in the map before we make the async call so that
- // the future send know which queue to write to, even if the client call isn't
- // actually ready yet.
- var stream *outstandingStream
- if veyronTempMsg.IsStreaming {
- stream = newStream()
- c.Lock()
- c.outstandingStreams[id] = stream
- c.Unlock()
+ var cctx context.T
+ var cancel context.CancelFunc
+
+ // TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
+ // However as a rollout strategy we must, otherwise there is a circular
+ // dependency between the WSPR change and the JS change that will follow.
+ if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+ cctx, cancel = ctx.WithCancel()
+ } else {
+ cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
}
- go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
+ var stream *outstandingStream
+
+ request := &outstandingRequest{
+ cancel: cancel,
+ }
+ if veyronTempMsg.IsStreaming {
+ // If this rpc is streaming, we would expect that the client would try to send
+ // on this stream. Since the initial handshake is done asynchronously, we have
+ // to put the outstanding stream in the map before we make the async call so that
+ // the future send know which queue to write to, even if the client call isn't
+ // actually ready yet.
+ request.stream = newStream()
+ }
+ c.Lock()
+ c.outstandingRequests[id] = request
+ c.Unlock()
+
+ go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, stream)
+}
+
+// HandleVeyronCancellation cancels the request corresponding to the
+// given id if it is still outstanding.
+func (c *Controller) HandleVeyronCancellation(id int64) {
+ c.Lock()
+ defer c.Unlock()
+ if request, ok := c.outstandingRequests[id]; ok && request.cancel != nil {
+ request.cancel()
+ }
}
// CloseStream closes the stream for a given id.
func (c *Controller) CloseStream(id int64) {
c.Lock()
defer c.Unlock()
- stream := c.outstandingStreams[id]
- if stream == nil {
- c.logger.Errorf("close called on non-existent call: %v", id)
+ if request, ok := c.outstandingRequests[id]; ok && request.stream != nil {
+ request.stream.end()
return
}
-
- stream.end()
+ c.logger.Errorf("close called on non-existent call: %v", id)
}
func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {